Source code for munetauvsim.environment

"""
Ocean environment modeling for AUV simulation.

Provides comprehensive environmental modeling including ocean currents,
bathymetry, and pollution dispersion for autonomous underwater vehicle
simulations. Supports procedural generation via Perlin noise and configurable
physical parameters.


Classes
-------
Ocean
    Container for all environmental components (current, floor, pollution).
Current1D
    Time-varying ocean current with configurable speed and direction profiles.
Current1DData
    Immutable data container for Current1D state serialization.
Floor
    Ocean floor depth map generated from Perlin noise.
PerlinNoise
    2D Perlin noise generator for terrain and environmental features.
Pollution
    Gaussian plume model for point-source pollution dispersion.

    
Functions
---------
None
    All functionality contained in classes

    
Notes
-----
**Environment Scope:**

Simulates environmental factors affecting vehicle dynamics and sensor readings.
Inputs: None (self-contained generation)
Outputs: Physical properties for dynamics integration and sensor simulation

**Typical Components:**

- Ocean current: Time-varying 1D current fields
- Ocean floor: Procedurally generated depth map
- Pollution plumes: Concentration dispersion from point sources

**Design Philosophy:**

Environment objects are designed for minimal coupling with simulation code.
All components are optional and can be configured independently or omitted.


Examples
--------
### Create calm ocean with default parameters:

>>> import munetauvsim.environment as env
>>> ocean = env.Ocean.calm_ocean(size=1000)
>>> print(ocean)oce

### Custom ocean with stormy conditions:

>>> ocean = env.Ocean(
...     spd='fast', dSpd='unsteady', dtSpd='choppy',
...     ang='any', dAng='varied', dtAng='regular',
...     z=150, z_range=30,
...     randomFloor=True, randomPlume=True
... )

### Individual component construction:

>>> current = env.Current1D(spd=0.5, dSpd=0.1, dtSpd=90.0, 
...                         ang=np.pi/4, dAng=0.2, dtAng=120.0,
...                         nIter=60000, seed=42)
>>> floor = env.Floor(z=125, z_range=15, size=1000, seed=100)
>>> pollution = env.Pollution(source=[200, 300, 25], Q=2.0, 
...                           u=0.8, v=np.pi/6)
>>> ocean = env.Ocean()
>>> ocean.current = current
>>> ocean.floor = floor
>>> ocean.pollution = pollution


See Also
--------
guidance.Waypoint : Path planning coordinates compatible with Floor sampling
navigation.Sensor : Sensor class that collects data from environment
vehicles.Remus100s : Vehicle class that interacts with Ocean environment
simulator.Simulator : Main simulation driver that propagates Ocean data
"""

from functools import singledispatchmethod
from typing import Dict, KeysView, List, Optional, Tuple, Union
from numpy.typing import NDArray
from collections.abc import Generator
from dataclasses import dataclass
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
from munetauvsim.guidance import Waypoint
from munetauvsim import logger

#-----------------------------------------------------------------------------#

# Type Aliases
NPFltArr = NDArray[np.float64]
NPIntArr = NDArray[np.int_]
Number = Union[int, float, np.number]

# Global Variables
log = logger.addLog('env')

###############################################################################

[docs]class Ocean: """ Container and manager for ocean environmental components. Aggregates ocean current, floor depth map, and pollution models into a unified environment object. Handles consistent parameterization across components and provides convenience constructors for typical scenarios. Parameters ---------- size : int, default=1000 Side length of square ocean floor area in meters. origin : list of float, default=[500, 500] Coordinates where (x=0, y=0) maps to floor array indices. Allows negative coordinates without array indexing issues. N : int, default=60000 Number of simulation iterations for current time series. h : float, default=0.02 Time step per iteration in seconds (50 Hz default). name : str, default='Ocean' Descriptive name for this ocean environment instance. plume : list of float, default=None Pollution source location [x, y, z] in meters. z is depth (positive). createFloor : bool, default=True If False, no Floor instance is generated. createPlume : bool, default=False If True, creates a pollution plume with emission from source location. currentSeed : int, optional PRNG seed for current generation. None generates random seed. floorSeed : int, default=0 PRNG seed for Perlin noise floor generation. plumeSeed : int, optional PRNG seed for pollution randomization. None generates random seed. randomFloor : bool, default=False If True, generates random floor seed instead of using floorSeed. randomPlume : bool, default=False If True, generates random plume parameters. **kwargs Additional keyword arguments passed to component constructors: - Current1D: spd, dSpd, dtSpd, ang, dAng, dtAng - Floor: z, z_range, style - Pollution: Q, u, v, randomU, randomV Attributes ---------- name : str Ocean environment identifier. N : int Number of simulation iterations (synchronized with current). sampleTime : float Time step in seconds (synchronized with current). size : int Ocean floor area side length in meters. origin : list of float Origin coordinates [x, y] for floor indexing. current : Current1D Ocean current model with time-varying speed and direction. floor : Floor Ocean floor model with Perlin noise terrain. pollution : Pollution or None Pollution plume dispersion model. None if not requested. Methods ------- None Ocean is primarily a container. See component classes for methods. Alternative Constructors ------------------------ calm_ocean(kwargs) Smooth conditions: typical speeds, steady changes, calm rates. Default depth: 200m uniform. dead_ocean(kwargs) Zero current conditions for baseline testing. Constant parameters with no variation. stormy_ocean(kwargs) Volatile conditions: fast speeds, unsteady changes, choppy rates. Notes ----- **Default Construction:** Default Ocean() creates calm environment: - Current Speed: 0.05-0.5 m/s, +/- 0.01-0.1 m/s over 180-300s - Current Direction: Northeastern, +/- 0.1-0.17 rad over 180-300s - Depth: 125-135m (uniform 125m with 10m Perlin noise range) - Area: 1 km^2 - Pollution: No pollution plume - Name: "Ocean" **Property Synchronization:** The N and sampleTime properties automatically update component objects: >>> ocean.N = 10000 # Resizes current.speed and current.angle arrays >>> ocean.sampleTime = 0.01 # Resamples current at 100 Hz Similarly, size and origin update the floor object. **Component Construction:** Ocean constructor is convenience-focused but not exhaustive. For advanced customization, construct components independently: >>> ocean = env.Ocean.dead_ocean(size=2000, origin=[1000,1000]) >>> ocean.pollution = env.Pollution(source=[500, 500, 40], Q=5.0, ... u=1.2, v=np.pi/3) **Design Considerations:** Currently, ocean parameters like current speed/direction are not automatically synchronized between current and pollution objects. Future versions should improve consistency handling. Examples -------- ### Create ocean with calm_ocean constructor: >>> ocean = env.Ocean.calm_ocean(size=1500, origin=[750, 750]) >>> print(ocean) ### Custom ocean from scratch: >>> ocean = env.Ocean( ... name="Test Environment", ... size=2000, ... origin=[1000, 1000], ... N=10000, ... spd='moderate', # 0.5-1.0 m/s ... dSpd='steady', # +/- 0.01-0.1 m/s ... dtSpd=60.0, # 60s half-period ... ang='north', # pi/2 +/- pi/8 ... dAng='constant', # No direction variation ... dtAng=90.0, # 90s half-period ... z=100, # Shallowest depth ... z_range=20, # Depth variation range ... randomFloor=False, ... currentSeed=123, ... floorSeed=456 ... ) ### Dead ocean for control experiments: >>> ocean = env.Ocean.dead_ocean(N=50000) >>> print(f"Current speed: {ocean.current.v_spd} m/s") # 0.0 ### Stormy ocean for stress testing: >>> ocean = env.Ocean.stormy_ocean( ... size=1000, ... z=80, # Shallower water ... z_range=40, # More dramatic terrain ... randomFloor=True ... ) >>> # Fast speeds (1.8-2.5 m/s), unsteady variations ### Accessing components: >>> ocean = env.Ocean.calm_ocean(createPlume=True) >>> # Sample current at iteration 1000 >>> current_speed = ocean.current.speed[1000] >>> current_angle = ocean.current.angle[1000] >>> >>> # Sample floor depth at position (100, 200) >>> depth = ocean.floor(100, 200) >>> >>> # Sample pollution concentration at (150, 150, 25m) >>> concentration = ocean.pollution(150, 150, 25) ### Use in simulation: >>> import munetauvsim as mn >>> ocean = env.Ocean.calm_ocean(size=2000, N=60000) >>> sim = mn.Simulator( ... name="OceanTest", ... ocean=ocean, ... vehicles=[mn.vehicles.Remus100s()] ... ) >>> sim.run() See Also -------- Current1D : Ocean current time series generation Floor : Procedural bathymetry from Perlin noise Pollution : Gaussian plume dispersion model Simulator : Main simulation driver that uses Ocean Warnings -------- - Pollution parameters (u, v) not auto-synced with current (manual update needed) """ ## Constructor ===========================================================#
[docs] def __init__(self, size:int = 1000, origin:List[float] = [500,500], N:int = 60000, h:float = 0.02, name="Ocean", plume:Union[NPFltArr, List[float], None] = None, createFloor:bool = True, createPlume:bool = False, currentSeed:Optional[int] = None, floorSeed:int = 0, plumeSeed:Optional[int] = None, randomFloor:bool = False, randomPlume:bool = False, **kwargs): """ Initialize ocean environment container with current, floor, and pollution components. Constructor that creates a complete ocean environment with synchronized parameters across all components. Handles categorical parameter specification for rapid prototyping or precise numerical control for advanced scenarios. Components can be customized via kwargs or replaced after construction for full flexibility. Parameters ---------- size : int, default=1000 Side length of square ocean floor area in meters. Defines simulation domain extent. Passed to Floor and Pollution. Typical values: - Small test: 500m - Standard ops: 1000-2000m - Large survey: 3000-5000m origin : list of float, default=[500, 500] Coordinates [x_origin, y_origin] where (x=0, y=0) maps in domain. Allows negative coordinate values. Passed to Floor and Pollution. Typical: [size/2, size/2] for centered operations. N : int, default=60000 Number of simulation iterations for current time series. Default 60000 * 0.02s = 1200s = 20 minutes simulation time. Passed to Current1D constructor as nIter. Managed via property: changing N resizes current arrays. h : float, default=0.02 Time step per iteration in seconds (50 Hz sampling rate). Passed to Current1D constructor. Managed via property: changing h resamples current data. Total sim time = N * h seconds. name : str, default="Ocean" Descriptive identifier for this ocean instance. Used in logging, plotting, and __str__ output. No functional impact on simulation. plume : list of float or ndarray, optional, default=None Pollution source location [x, y, z] in meters (END frame). Passed to Pollution constructor as source parameter. z is depth below surface (positive input). If None and createPlume=False, no pollution object is created. If None and createPlume=True, uses a default source of [0, 0, 30]. If not None, creates Pollution regardless of createPlume (backwards compatible). createFloor : bool, default=True If True (default), creates a Floor instance with procedural bathymetry from Perlin noise. If False, sets `floor` to None — useful for deep-ocean scenarios or control experiments where terrain is irrelevant. Floor can also be disabled post-construction by setting `ocean.floor = None`. createPlume : bool, default=False If True, creates a Pollution instance with emission from the specified plume source location. If False and plume is not None, a Pollution instance is still created. currentSeed : int, optional PRNG seed for Current1D generation. If None, Current1D generates random seed (non-reproducible). If provided, ensures deterministic current time series. floorSeed : int, default=0 PRNG seed for Floor/PerlinNoise generation. seed=0 is valid and deterministic. Overridden if randomFloor=True. plumeSeed : int, optional PRNG seed for Pollution randomization (if randomPlume=True). If None and randomPlume=True, generates random seed. If randomPlume=False, plumeSeed has no effect. randomFloor : bool, default=False If True, generates random floor seed (ignores floorSeed parameter). Creates unique terrain each run. Ignored if createFloor=False. randomPlume : bool, default=False If True, randomizes pollution u and v parameters. Generates random plumeSeed if plumeSeed=None. **kwargs : dict Additional keyword arguments passed to component constructors. **Current1D parameters** (see Current1D.__init__ for details): - spd : Mean current speed (float/str/list) - dSpd : Speed variation amplitude (float/str/list) - dtSpd : Speed variation half-period (float/str/list) - ang : Mean current direction (float/str/list) - dAng : Direction variation amplitude (float/str/list) - dtAng : Direction variation half-period (float/str/list) **Floor parameters** (see Floor.__init__ for details): - z : Minimum floor depth (float) - z_range : Depth variation range (float) - style : Terrain generation style (str) **Pollution parameters** (see Pollution.__init__ for details): - Q : Emission rate (float) - u : Wind/current speed (float) - overridden by current.v_spd if not provided - v : Wind/current direction (float) - overridden by current.b_ang if not provided - randomU : Randomize pollution wind speed (bool/list) - randomV : Randomize pollution wind direction (bool/list) Attributes ---------- name : str Ocean identifier string. N : int Number of iterations (managed property, updates current.n). sampleTime : float Time step in seconds (managed property, updates current.h). size : int Floor area dimension (managed property, updates floor.size). origin : list of float Origin coordinates (managed property, updates floor.origin). current : Current1D Time-varying ocean current instance. floor : Floor or None Procedurally generated ocean floor instance. None if createFloor=False or disabled post-construction. pollution : Pollution or None Gaussian plume pollution instance. None if createPlume = False (default). Notes ----- **Construction Process** 1. **Store Basic Parameters:** name, N (via property), sampleTime (via property), size (via property), origin (via property) stored as attributes. 2. **Create Current1D:** >>> self.current = Current1D( ... nIter=N, ... h=h, ... seed=currentSeed, ... spd=kwargs.get('spd', default), ... dSpd=kwargs.get('dSpd', default), ... dtSpd=kwargs.get('dtSpd', default), ... ang=kwargs.get('ang', default), ... dAng=kwargs.get('dAng', default), ... dtAng=kwargs.get('dtAng', default) ... ) Uses constructor defaults if kwargs not provided. 3. **Create Floor:** >>> self.floor = Floor( ... size=size, ... origin=origin, ... seed=floorSeed, ... random=randomFloor, ... z=kwargs.get('z', default), ... z_range=kwargs.get('z_range', default), ... style=kwargs.get('style', 'linear') ... ) 4. **Create Pollution:** >>> self.pollution = Pollution( ... source=plume, ... u=self.current.v_spd, # Sync with current mean speed ... v=self.current.b_ang, # Sync with current mean direction ... oceanSize=size, ... oceanOrigin=origin, ... oceanDepth=self.floor.z, # Sync with floor shallowest depth ... seed=plumeSeed, ... random=randomPlume, ... Q=kwargs.get('Q', default), ... randomU=kwargs.get('randomU', False), ... randomV=kwargs.get('randomV', False) ... ) Pollution u and v default to current parameters unless overridden by explicit kwargs or randomization flags. **Property Synchronization** Ocean properties that automatically propagate changes to components: - **N Property:** Setting ocean.N triggers current.n update: >>> ocean.N = 100000 >>> # Internally: ocean.current.n = 100000 >>> # Current regenerates speed/angle arrays to new length - **sampleTime Property:** Setting ocean.sampleTime triggers current.h update: >>> ocean.sampleTime = 0.01 # 100 Hz >>> # Internally: ocean.current.h = 0.01 >>> # Current resamples data at new rate - **size Property:** Setting ocean.size triggers floor.size update: >>> ocean.size = 2000 >>> # Internally: ocean.floor.size = 2000 >>> # Floor regenerates with extended dimensions - **origin Property:** Setting ocean.origin triggers floor.origin update: >>> ocean.origin = [1000, 1000] >>> # Internally: ocean.floor.origin = [1000, 1000] **Design Limitations:** Current implementation has parameter consistency issues: - Pollution u, v set from current mean values at construction only - Changing current parameters after construction does NOT update pollution - Changing floor z after construction does NOT update pollution oceanDepth Workaround: Manually update pollution parameters after Ocean construction: >>> ocean = Ocean.calm_ocean(createPlume=True) >>> ocean.current.v_spd = 2.0 >>> ocean.pollution.u = 2.0 # Manual sync required Future improvement: Implement pollution property to auto-sync with current/floor. **Alternative Constructors:** Ocean provides convenient factory methods: - calm_ocean(kwargs): Typical coastal conditions (smooth, steady): - spd='typical' (0.05-0.5 m/s) - dSpd='steady' (+/- 0.01-0.1 m/s) - dtSpd='calm' (600s half-period) - ang='northeast' - dAng='steady' - dtAng='calm' - z=200 (if not overridden) - dead_ocean(kwargs): Zero current for baseline testing: - spd='dead' (0.0 m/s) - dSpd='constant' (0.0 variation) - dtSpd='calm' - ang=0 - dAng='constant' - dtAng='calm' - stormy_ocean(kwargs): High-energy, volatile conditions: - spd='fast' (1.8-2.5 m/s) - dSpd='unsteady' (+/- 0.5-1.5 m/s) - dtSpd='choppy' (10-30s half-period) - ang='any' (random direction) - dAng='unsteady' - dtAng='choppy' See Also -------- Ocean.calm_ocean : Factory for typical coastal conditions Ocean.dead_ocean : Factory for zero-current baseline Ocean.stormy_ocean : Factory for high-energy conditions Current1D.__init__ : Current parameter details Floor.__init__ : Floor parameter details Pollution.__init__ : Pollution parameter details Simulator : Main simulation driver using Ocean Examples -------- ### Default calm ocean with pollution: .. code-block:: pycon >>> import munetauvsim.environment as env >>> ocean = env.Ocean(createPlume=True) >>> print(ocean) Ocean: Ocean ---------------------------------------- Current Speed: 0.275 +/-0.055 m/s (180.0s) Angle: 0.785 +/-0.140 rad (180.0s) Seed: 1234567890 Floor Size: 1000 m Origin: [500, 500] Depth: 125 to 135 m Style: linear ... (Perlin noise details) Pollution Source: (0.00, 0.00, 30.00) Strength: 1.59 g/s Speed: 0.28 m/s at 0.79 rad Seed: None ---------------------------------------- ### Custom environment: >>> ocean = env.Ocean( ... name="TestOcean", ... size=2000, ... origin=[1000, 1000], ... N=120000, ... h=0.01, ... spd=0.8, ... dSpd=0.2, ... dtSpd=90.0, ... ang=np.pi/4, ... dAng=0.1, ... dtAng=120.0, ... z=100, ... z_range=25, ... plume=[500, 500, 20], ... Q=2.0, ... currentSeed=42, ... floorSeed=100 ... ) ### Use alternative constructors: >>> calm = env.Ocean.calm_ocean(size=1500, z=180) >>> dead = env.Ocean.dead_ocean(N=30000) >>> stormy = env.Ocean.stormy_ocean(size=3000, randomFloor=True) ### Access components: >>> ocean = env.Ocean.calm_ocean(plume=[-50,-50,80]) >>> current_speed_at_1000 = ocean.current.speed[1000] >>> depth_at_origin = ocean.floor(0, 0) >>> pollution_conc = ocean.pollution(100, 100, 25) ### Modify after construction: >>> ocean = env.Ocean() >>> ocean.N = 100000 # Extend simulation time >>> ocean.size = 2000 # Expand floor area >>> ocean.pollution = env.Pollution(source=[200, 200, 25], Q=5.0) ### Synchronized parameters: >>> ocean = env.Ocean(createPlume=True) >>> print(f"Current mean: {ocean.current.v_spd:.2f} m/s") >>> print(f"Pollution u: {ocean.pollution.u:.2f} m/s") >>> # Should match at construction Warnings -------- - Pollution parameters NOT automatically synchronized with current changes after construction. Manual updates required. """ self.name = name self.N = N self.sampleTime = h self.size = size self.origin = origin # Current1D instance self.current = Current1D(nIter=N, h=h, seed=currentSeed, **kwargs) # Floor instance if (createFloor): self.floor = Floor(size=size, origin=origin, seed=floorSeed, random=randomFloor, **kwargs) else: self.floor = None # Pollution instance if ((plume is not None) or (createPlume)): if (plume is None): plume = [0, 0, 30] oceanDepth = (self.floor.z if (self.floor is not None) else kwargs.get('z', 6000)) self.pollution = Pollution(source=plume, u=self.current.v_spd, v=self.current.b_ang, oceanSize=size, oceanOrigin=origin, oceanDepth=oceanDepth, seed=plumeSeed, random=randomPlume, **kwargs) else: self.pollution = None
## Properties ============================================================# @property def N(self)->int: """Number of simulation iterations.""" return self._N @N.setter def N(self, n:int)->None: """Set number of iterations and resize current data.""" if (('current' in self.__dict__) and (self.current is not None) and (self.current.n != n)): self.current.n = n self._N = n #-------------------------------------------------------------------------- @property def sampleTime(self)->float: """Time step of each iteration (s).""" return self._sampleTime @sampleTime.setter def sampleTime(self, h:float)->None: """Set iteration time step and resample data.""" if (('current' in self.__dict__) and (self.current is not None) and (self.current.h != h)): self.current.h = h self._sampleTime = h #-------------------------------------------------------------------------- @property def size(self)->int: """Length of one side of floor area.""" return self._size @size.setter def size(self, size:int)->None: """Set size and recreate floor map. Assumes same origin.""" if (('floor' in self.__dict__) and (self.floor is not None) and (self.floor.size != size)): self.floor.size = size self._size = size #-------------------------------------------------------------------------- @property def origin(self)->List[float]: """Coordinates of zero point (x=0,y=0).""" return self._origin @origin.setter def origin(self, origin:List[float])->None: """Set coordinates of zero point (x=0,y=0).""" if (('floor' in self.__dict__) and (self.floor is not None)): self.floor.origin = origin self._origin = origin ## Alternative Constructors ==============================================#
[docs] @classmethod def calm_ocean(cls,**kwargs): """ Create deep ocean with slow speeds, steady variations, and calm rates. **Defaults:** - z: 200m - spd: 'typical' - dSpd: 'steady' - dtSpd: 'calm' - ang: 'northeast' - dAng: 'steady' - dtAng: 'calm' Returns ------- ocean : Ocean 'Calm Ocean' instance. """ if ('z' not in kwargs): kwargs['z'] = 200 return cls(name='Calm Ocean', spd='typical',dSpd='steady',dtSpd='calm', ang='northeast',dAng='steady',dtAng='calm', **kwargs)
#--------------------------------------------------------------------------
[docs] @classmethod def dead_ocean(cls,**kwargs): """ Create ocean with zero current and no variations. **Defaults:** - spd: 'dead' - dSpd: 'constant' - dtSpd: 'calm' - ang: 0 - dAng: 'constant' - dtAng: 'calm' Returns ------- ocean : Ocean 'Dead Ocean' instance. """ return cls(name='Dead Ocean', spd='dead',dSpd='constant',dtSpd='calm', ang=0,dAng='constant',dtAng='calm', **kwargs)
#--------------------------------------------------------------------------
[docs] @classmethod def stormy_ocean(cls,**kwargs): """ Create ocean with fast current, unsteady variations, and choppy rates. **Defaults:** - spd: 'fast' - dSpd: 'unsteady' - dtSpd: 'choppy' - ang: 'any' - dAng: 'unsteady' - dtAng: 'choppy' Returns ------- ocean : Ocean 'Stormy Ocean' instance. """ return cls(name='Stormy Ocean', spd='fast',dSpd='unsteady',dtSpd='choppy', ang='any',dAng='unsteady',dtAng='choppy', **kwargs)
## Special Methods =======================================================#
[docs] def __repr__(self) -> str: """Detailed description of Ocean""" geometry = "" if (self.floor is None): geometry = f"size={self.size}, origin={self.origin}, " return ( f"{self.__class__.__name__}(" f"name='{self.name}', {geometry}" f"current={self.current!r}, " f"floor={self.floor!r}, " f"pollution={self.pollution!r})" )
#--------------------------------------------------------------------------
[docs] def __str__(self) -> str: """User friendly description of Ocean""" cw = 16 components = [] if (self.floor is not None): floorStr = str(self.floor) else: components.append(f"{'Size:':{cw}} {self.size} m") components.append(f"{'Origin:':{cw}} {self.origin}\n") floorStr = "Floor None\n" components.extend([str(self.current), floorStr]) if (self.pollution is not None): components.append(str(self.pollution)) out = '\n'.join(components) ll = len(max(out.split('\n'), key=len)) return ( f"\nOcean: {self.name}\n" f"{'-'*ll}\n" f"{out}" f"{'-'*ll}\n" )
###############################################################################
[docs]@dataclass class Current1DData: """ Immutable data container for Current1D. Offers lightweight alternative to full Current1D instance. Suitable for serialization. Attributes ---------- speed : ndarray Speed time series in m/s. angle : ndarray Direction time series in radians. v_spd : float Mean speed value. v_dv : float Speed amplitude deviation. v_dt : float Speed half-period. b_ang : float Mean direction value. b_db : float Direction amplitude deviation. b_dt : float Direction half-period. n : int Number of iterations. h : float Time step in seconds. seed : int PRNG seed used. See Also -------- Current1D.getAsData : Convert Current1D instance into Current1DData instance Current1D.genAsData : Class method to generate a Current1DData instance with Current1D methods. Effectively a constructor for a Current1DData object. """ __slots__ = ['speed', 'angle', 'v_spd', 'v_dv', 'v_dt', 'b_ang', 'b_db', 'b_dt', 'n', 'h', 'seed'] speed: NPFltArr angle: NPFltArr v_spd: float v_dv: float v_dt: float b_ang: float b_db: float b_dt: float n: int h: float seed: int
###############################################################################
[docs]class Current1D: """ Time-varying ocean current with configurable speed and direction profiles. Generates realistic ocean current time series using sinusoidal modulation with user-configurable mean values, variation amplitudes, and temporal periods. Supports categorical parameter specification for rapid prototyping or precise numerical control for advanced scenarios. Parameters ---------- spd : float, list, or str, default='typical' Mean current speed specification in meters per second. This value is the overall mean value and the actual speed values will fluctuate about this mean. If a numerical range is provided, a random value within the range will be selected. If a named range is provided, a random value within the predefined range will be selected. - float: Explicit speed in m/s (0.0-2.5) - list: [low, high] range of speed in m/s (0.0-2.5) - str categories (m/s): - 'dead': 0.0 - 'typical': 0.05 - 0.5 - 'moderate': 0.5 - 1.0 - 'swift': 1.0 - 1.8 - 'fast': 1.8 - 2.5 - 'any': 0.0 - 2.5 dSpd : float, list, or str, default='steady' Speed variation amplitude specification in meters per second. This value is the average amplitude deviation about the mean value through which the current speed will fluctuation. Each oscillation will selecte a new amplitude around this value. This provides more stoichastic variation rather than simple uniform periodic oscillation. If a numerical range is provided, a random value within the range will be selected. If a named range is provided, a random value within the predefined range will be selected. - float: Explicit amplitude in m/s (0.0-1.5) - list: [low, high] range of speed in m/s (0.0-1.5) - str categories (m/s): - 'constant': 0.0 - 'steady': 0.01 - 0.1 - 'varied': 0.1 - 0.5 - 'unsteady': 0.5 - 1.5 - 'any': 0.0 - 1.5 dtSpd : float, list, or str, default='regular' Speed variation half-period specification in seconds. This value is the time of an oscillation from one amplitude to the next (e.g., from max speed to min speed in one oscillation period). Smaller values produce more frequent changes and more unsteady conditions. If a numerical range is proided, a random value within the range will be selected. If a named range is provided, a random value within the predefined range will be selected. - float: Explicit period in seconds - list: [low, high] range of time in seconds - str categories (s): - 'calm': 600 - 'smooth': 90 - 150 - 'regular': 30 - 90 - 'choppy': 10 - 30 - 'any': 10 - 600 ang : float, list, or str, default='northeast' Mean current direction specification in radians. This value is the overall mean value and the actual angle values will fluctuate about this mean. If a numerical range is provided, a random value within the range will be selected. If a named range is provided, a random value within the predefined range will be selected. - float: Explicit angle in radians (0=East, pi/2=North, etc.) - list: [low, high] range of angle in radians within (-pi,pi) - str categories: - 'east': 0 +/- pi/8 - 'northeast': pi/4 +/- pi/8 - 'north': pi/2 +/- pi/8 - 'northwest': 3pi/4 +/- pi/8 - 'west': pi +/- pi/8 - 'southwest': -3pi/5 +/- pi/8 - 'south': -pi/2 +/- pi/8 - 'southeast': -pi/4 +/- pi/8 - 'any': 0 +/- pi dAng : float, list, or str, default='steady' Direction variation amplitude specification in radians. This value is the average amplitude deviation about the mean value through which the current angle will fluctuation. Each oscillation will selecte a new amplitude around this value. This provides more stoichastic variation rather than simple uniform periodic oscillation. If a numerical range is provided, a random value within the range will be selected. If a named range is provided, a random value within the predefined range will be selected. - float: Explicit amplitude in radians (0.0-pi/2) - list: [low, high] range of angle in radians within (0.0-pi/2) - str categories: - 'constant': 0.0 - 'steady': 0.1 - pi/18 - 'varied': pi/18 - pi/8 - 'unsteady': pi/8 - pi/2 - 'any': 0.0 - pi/2 dtAng : float, list, or str, default='regular' Direction variation half-period specification in seconds. This value is the time of an oscillation from one amplitude to the next (e.g., from max angle to min angle in one oscillation period). Smaller values produce more frequent changes and more unsteady conditions. If a numerical range is proided, a random value within the range will be selected. If a named range is provided, a random value within the predefined range will be selected. - float: Explicit period in seconds - list: [low, high] range of time in seconds - str categories (s): - 'calm': 600 - 'smooth': 90 - 150 - 'regular': 30 - 90 - 'choppy': 10 - 30 - 'any': 10 - 600 nIter : int, default=60000 Number of simulation iterations (time series length). h : float, default=0.02 Time step per iteration in seconds. seed : int, optional PRNG seed for reproducibility. If None, generates random seed for unique current patterns each run. Attributes ---------- speed : ndarray, shape (N,) Time series of current speed in m/s. angle : ndarray, shape (N,) Time series of current direction in radians. v_spd : float Mean current speed in m/s. From spd keyword. v_dv : float Speed variation amplitude in m/s. From dSpd keyword. v_dt : float Speed variation half-period in seconds. From dtSpd keyword. b_ang : float Mean current direction in radians. From ang keyword. b_db : float Direction variation amplitude in radians. From dAng keyword. b_dt : float Direction variation half-period in seconds. From dtAng keyword. N : int Number of time samples. sampleTime : float Time step in seconds. seed : int PRNG seed used for reproducible generation. Methods ------- genSpeedList(mean, stddev, dt, n, h) : Generate a list of speed values that fluctuate around the mean. genAngleList(mean, stddev, dt, n, h) : Generate a list of angle values that fluctuate around the mean. display() : Display Current1D data in simple speed / angle vs time plots. getAsData() : Construct a Current1DData object from Current1D instance. genAsData(spd, dSpd, dtSpd, ang, dAng, dtAng, nIter, h, seed) : Static method. Construct a Current1DData object from input parameters. Generation Model ---------------- Speed and direction are modeled as sinusoidal oscillations: .. code-block:: none speed(t) = v_spd + v_dv * sin(pi * t / v_dt) angle(t) = b_ang + b_db * sin(pi * t / b_dt) Providing smoothly varying, stoichastic fluctuations around desired mean values. Notes ----- **Categorical vs Numerical Parameters:** Categorical strings are converted to numerical ranges via internal tables. Use strings for general specification or to allow small randomness in input. Use explicit numbers for precise control. >>> # Categorical (quick setup) >>> current1 = Current1D(spd='moderate', dSpd='steady', ang='north') >>> >>> # Numerical Range (general control) >>> current2 = Current1D(spd=[0.75,1.25], dSpd=[0.05,0.40], ang=np.pi/2) >>> >>> # Numerical (exact control) >>> current3 = Current1D(spd=0.75, dSpd=0.05, ang=np.pi/2) **Randomization:** When using categorical strings and numerical ranges, random selection within the category is performed by uniform random distribution: >>> current = Current1D(spd='typical', seed=42) >>> # v_spd selected uniformly from [0.05, 0.5] m/s Within each half-period of oscillation, the speed and angle amplitudes are randomly selected around the mean deviation by random normal distribution with a 3-sigma limit. **Resampling:** Changing N or sampleTime regenerates time series: >>> current.N = 100000 # Extends simulation time >>> current.sampleTime = 0.01 # Doubles sample rate >>> # Both automatically call _genVals() **Coordinate System:** Angles follow END (East-North-Down) convention: - 0 rad: East (+x direction) - pi/2 rad: North (+y direction) - pi rad: West (-x direction) - -pi/2 rad: South (-y direction) Examples -------- ### Create typical coastal current: >>> current = Current1D( ... spd='typical', # 0.05-0.5 m/s ... dSpd='steady', # +/- 0.01-0.1 m/s ... dtSpd='calm', # 600s half-period ... ang='northeast', # pi/4 +/- pi/8 ... dAng='steady', # +/- 0.1-0.17 rad (10 deg) ... dtAng='smooth', # 90-150s half-period ... nIter=60000, ... seed=100 ... ) >>> print(f"Mean: {current.v_spd:.2f} m/s at {current.b_ang:.1f} rad") ### Strong tidal current: >>> tidal = Current1D( ... spd='fast', # 1.8-2.5 m/s ... dSpd='unsteady', # +/- 0.5-1.5 m/s (ebb/flood) ... dtSpd='smooth', # 90-150s ... ang='north', # Aligned with channel ... dAng='constant', # No direction change ... dtAng='regular', ... nIter=120000, # 40 minutes ... ) >>> tidal.display() ### Zero current for baseline: >>> dead_current = Current1D( ... spd='dead', ... dSpd='constant', ... dtSpd=600, ... ang=0.0, ... dAng='constant', ... dtAng=600, ... nIter=60000, ... ) >>> dead_current.display() ### Precise numerical control: >>> precise = Current1D( ... spd=0.82, # Exactly 0.82 m/s ... dSpd=0.15, # Exactly +/- 0.15 m/s ... dtSpd=120.0, # 120s half-period ... ang=0.785, # pi/4 (45 deg) ... dAng=0.1, # +/- 0.1 rad (+/- 5.7 deg) ... dtAng=90.0, # 90s half-period ... nIter=60000, ... seed=42 ... ) ### Access time series data: >>> current = Current1D(spd='moderate', seed=50) >>> # Sample at specific iteration >>> speed_1000 = current.speed[1000] >>> angle_1000 = current.angle[1000] >>> >>> # Vector components (END frame) >>> u_current = speed_1000 * np.cos(angle_1000) # East component >>> v_current = speed_1000 * np.sin(angle_1000) # North component ### Use in vehicle dynamics: >>> # Vehicle reads current from sensor >>> V_c = current.speed[iteration] >>> beta_Vc = current.angle[iteration] >>> # Apply to vehicle dynamics (see vehicles.py) See Also -------- Ocean : Container class that manages Current1D instance Current1DData : Immutable data container for serialization navigation.OceanCurrentSensor : Sensor that samples current data Warnings -------- - Sinusoidal model does not capture turbulent fluctuations or eddies. Being 1D data, the current will be taken as uniform across any arbitrary ocean space (at any given time slice). - 1D model assumes depth-uniform current (not realistic for stratified water). """ ## Class Constants =======================================================# _V_LO = 0.0 # Absolute min value for Ocean Current Speed (m/s) _V_HI = 2.5 # Standard max value for Ocean Current Speed (m/s) _CI = 3.0 # Confidence Interval for Standard Deviation calc ## Constructor ===========================================================#
[docs] def __init__(self, spd:Union[float, str, List[float]] = 'typical', dSpd:Union[float, str, List[float]] = 'steady', dtSpd:Union[float, str, List[float]] = 'regular', ang:Union[float, str, List[float]] = 'northeast', dAng:Union[float, str, List[float]] = 'steady', dtAng:Union[float, str, List[float]] = 'regular', nIter:int = 60000, h:float = 0.02, seed:Optional[int] = None, **kwargs, )->None: """ Construct time-varying ocean current with sinusoidal speed and direction profiles. Generates realistic ocean current time series using sinusoidal modulation around user-specified mean values. Supports categorical parameter specification (quick setup) or precise numerical control (advanced tuning). Designed for deterministic or randomized current generation in AUV simulation scenarios. Parameters ---------- spd : float, str, or list of float, default='typical' Mean current speed specification in meters per second (m/s). This is the central value around which speed oscillates. **Float**: Exact mean speed in m/s (0.0-2.5). >>> Current1D(spd=0.75) # Exactly 0.75 m/s mean **List [low, high]**: Random selection within range via uniform distribution. >>> Current1D(spd=[0.5, 1.0]) # Random between 0.5-1.0 m/s **String categories** (m/s ranges): - 'dead': 0.0 (no current) - 'typical': 0.05-0.5 (coastal/shelf currents) - 'moderate': 0.5-1.0 (active shelf/slope) - 'swift': 1.0-1.8 (strong tidal/boundary currents) - 'fast': 1.8-2.5 (extreme tidal/jet streams) - 'any': 0.0-2.5 (full randomization) Automatically clamped to [0.0, 2.5] m/s. dSpd : float, str, or list of float, default='steady' Mean speed variation amplitude in m/s (+/- deviation from spd). Controls magnitude of sinusoidal speed oscillations. Each half-period samples new amplitude from normal distribution around this value (adds stochastic variation). **Float**: Exact mean amplitude deviation in m/s (0.0-1.5). >>> Current1D(spd=1.0, dSpd=0.2) # Speed varies ~0.8-1.2 m/s **List [low, high]**: Random mean amplitude deviation from range. >>> Current1D(dSpd=[0.1, 0.3]) **String categories** (m/s ranges): - 'constant': 0.0 (no variation, steady speed) - 'steady': 0.01-0.1 (gentle fluctuations) - 'varied': 0.1-0.5 (moderate variations) - 'unsteady': 0.5-1.5 (large tidal-like swings) - 'any': 0.0-1.5 (full randomization) Automatically limited to not exceed speed bounds [0.0, 2.5]. dtSpd : float, str, or list of float, default='regular' Speed variation half-period in seconds. Time for one oscillation from max to min (or vice versa). Smaller values -> more frequent, choppy changes. Larger values -> slower, smoother variations. **Float**: Exact half-period in seconds. >>> Current1D(dtSpd=120.0) # 2-minute oscillation period **List [low, high]**: Random selection of period range. >>> Current1D(dtSpd=[30, 90]) **String categories** (second ranges): - 'calm': 600 (10-minute oscillations, very smooth) - 'smooth': 90-150 (1.5-2.5 minutes, gentle) - 'regular': 30-90 (0.5-1.5 minutes, typical) - 'choppy': 10-30 (10-30 seconds, rapid changes) - 'any': 10-600 (full randomization) ang : float, str, or list of float, default='northeast' Mean current direction in radians (END convention). This is the central bearing around which direction oscillates. **Float**: Exact mean direction in radians. >>> Current1D(ang=np.pi/2) # North (90 deg) >>> Current1D(ang=0) # East (0 deg) **List [low, high]**: Random mean direction from range. >>> Current1D(ang=[0, np.pi/4]) # East to Northeast **String categories** (radian ranges, +/- pi/8 from center): - 'east': 0 +/- pi/8 (337.5 deg to 22.5 deg) - 'northeast': pi/4 +/- pi/8 ( 22.5 deg to 67.5 deg) - 'north': pi/2 +/- pi/8 ( 67.5 deg to 112.5 deg) - 'northwest': 3pi/4 +/- pi/8 (112.5 deg to 157.5 deg) - 'west': pi +/- pi/8 (157.5 deg to 202.5 deg) - 'southwest': -3pi/4 +/- pi/8 (202.5 deg to 247.5 deg) - 'south': -pi/2 +/- pi/8 (247.5 deg to 292.5 deg) - 'southeast': -pi/4 +/- pi/8 (292.5 deg to 337.5 deg) - 'any': 0 +/- pi (full circle, random) Automatically wrapped to [-pi, pi] (smallest signed angle). dAng : float, str, or list of float, default='steady' Mean direction variation amplitude in radians (+/- deviation from mean). Controls magnitude of direction oscillations. Each half-period samples new amplitude from normal distribution around this value. **Float**: Exact mean angular deviation in radians (0.0-pi/2). >>> Current1D(ang=0, dAng=0.2) # +/- 11.5 deg oscillation **List [low, high]**: Random mean amplitude selection from range. >>> Current1D(dAng=[0.1, 0.3]) # +/- 5.7 deg to +/- 17.2 deg **String categories** (radian ranges): - 'constant': 0.0 (no directional change) - 'steady': 0.1 - pi/18 (+/- 5.7 deg to +/- 10 deg, slight meander) - 'varied': pi/18 - pi/8 (+/- 10 deg to +/- 22.5 deg, moderate swing) - 'unsteady': pi/8 - pi/2 (+/- 22.5 deg to +/- 90 deg, large swings) - 'any': 0.0 - pi/2 (full range) Automatically clipped to [0, pi]. dtAng : float, str, or list of float, default='regular' Direction variation half-period in seconds. Time for one directional oscillation from max to min. **Float**: Exact half-period in seconds. >>> Current1D(dtAng=180.0) # 3-minute direction cycle **List [low, high]**: Random period range. **String categories**: Same as dtSpd (calm/smooth/regular/choppy/any). nIter : int, default=60000 Number of simulation iterations (time series length). Default 60000 iterations * 0.02s = 1200s = 20 minutes. h : float, default=0.02 Time step per iteration in seconds. Default 0.02s = 50 Hz sampling rate. Total simulation time = nIter * h seconds. seed : int, optional PRNG seed for reproducibility. If None, generates random seed from system entropy (unique each run). If provided, ensures identical current time series for same parameters. seed=0 is valid and deterministic. **kwargs Additional keyword arguments. Attributes ---------- speed : ndarray, shape (nIter,) Generated speed time series in m/s. angle : ndarray, shape (nIter,) Generated direction time series in radians. v_spd : float Mean speed in m/s (after randomization/validation). v_dv : float Speed amplitude in m/s (after randomization/validation). v_dt : float Speed half-period in seconds. b_ang : float Mean direction in radians (after randomization/wrapping). b_db : float Direction amplitude in radians (after randomization/clipping). b_dt : float Direction half-period in seconds. n : int Number of iterations (stored via property, triggers generation). h : float Time step in seconds (stored via property, triggers resampling). seed : int Actual PRNG seed used (from parameter or generated). Generation Process ------------------ 1. **Seed Initialization:** If seed=None: Generate from np.random.SeedSequence().entropy. Store in self.seed and create RNG: self._rng = np.random.default_rng(seed). 2. **Parameter Resolution:** For each parameter (spd, dSpd, dtSpd, ang, dAng, dtAng): a. If string: Look up in category dictionary, select random value in range. b. If list [low, high]: Select random value via uniform distribution. c. If float: Use value. d. Apply validation/clipping/wrapping via property setters. 3. **Standard Deviation Calculation:** Convert amplitude deviations to 3-sigma standard deviations: - sigma_speed = v_dv / 3 - sigma_angle = b_db / 3 Stored as _v_sdv and _b_sdb (computed lazily when accessed). 4. **RNG State Capture:** Before each time series generation, store RNG state: - _spd_rng_state: For speed reproducibility - _ang_rng_state: For angle reproducibility Allows resizing/resampling while maintaining pattern. 5. **Time Series Generation:** Call genSpeedList() and genAngleList(): - Each creates sinusoidal oscillations with variable amplitude - Amplitude resampled each half-period from normal distribution - Speed clipped to [0.0, 2.5] m/s - Angle wrapped to [-pi, pi] Store in self.speed and self.angle arrays. Sinusoidal Oscillation Model ----------------------------- **Speed Time Series:** v(t) = v_spd + a_i * sin((pi*t)/(v_dt)) where: - v_spd: Mean speed - a_i: Amplitude for half-period i, drawn from N(v_dv, sigma_v) - v_dt: Half-period duration - Direction flips each half-period **Direction Time Series:** theta(t) = b_ang + a_i * sin((pi*t)/(b_dt)) where: - b_ang: Mean direction - a_i: Amplitude for half-period i, drawn from N(b_db, sigma_b) - b_dt: Half-period duration - Direction flips each half-period **Stochastic Variation:** Each oscillation half-period selects new amplitude: - a_i ~ N(mean_amplitude, stddev) - Adds natural variability (not purely periodic) - Simulates realistic environmental fluctuations Notes ----- **Parameter Priority:** When parameters specified multiple ways: 1. Property setter validation (clipping/wrapping) 2. User explicit value (if float) 3. Random from custom range (if list) 4. Random from category range (if string) **Categorical Parameter Philosophy:** String categories enable quick prototyping without remembering exact numerical ranges: >>> current = Current1D(spd='moderate', dSpd='steady', ang='north') For precise control, use floats: >>> current = Current1D(spd=0.82, dSpd=0.15, ang=np.pi/2) **Reproducibility:** Identical seed + parameters -> identical time series: >>> c1 = Current1D(spd='typical', seed=42) >>> c2 = Current1D(spd='typical', seed=42) >>> np.array_equal(c1.speed, c2.speed) True But different seeds with same category -> different values: >>> c3 = Current1D(spd='typical', seed=43) >>> c1.v_spd != c3.v_spd # Different mean selected True **Property-Triggered Regeneration:** Changing n or h after initialization regenerates time series: >>> current = Current1D(nIter=10000, seed=100) >>> current.n = 20000 # Extends to 20000 iterations >>> # Uses stored RNG state to maintain pattern continuity **Integration with Ocean:** Ocean passes categorical or numerical current parameters: >>> ocean = Ocean( ... spd='moderate', ... dSpd='steady', ... dtSpd='calm', ... ang='northeast', ... dAng='steady', ... dtAng='calm', ... N=60000, ... h=0.02, ... currentSeed=42 ... ) >>> ocean.current # Current1D instance with these parameters Warnings -------- - Invalid string categories trigger warning and fallback to defaults - Invalid list lengths (!=2) trigger warning and fallback - Extremely small dtSpd or dtAng (<1s) may cause rapid oscillations See Also -------- genSpeedList : Speed time series generation algorithm genAngleList : Direction time series generation algorithm Current1DData : Immutable data container for serialization Ocean.__init__ : Creates Current1D with oceanographic parameters Examples -------- ### Quick categorical setup: >>> import munetauvsim.environment as env >>> current = env.Current1D( ... spd='moderate', ... dSpd='steady', ... dtSpd='regular', ... ang='north', ... dAng='constant', ... dtAng='calm', ... nIter=60000, ... seed=42 ... ) >>> print(f"Mean: {current.v_spd:.2f} m/s at {current.b_ang:.2f} rad") ### Precise numerical control: >>> current = env.Current1D( ... spd=0.82, ... dSpd=0.15, ... dtSpd=120.0, ... ang=np.pi/4, ... dAng=0.1, ... dtAng=90.0, ... nIter=60000, ... h=0.02, ... seed=100 ... ) ### Mixed categorical and numerical: >>> current = env.Current1D( ... spd=[0.5, 1.0], # Custom range ... dSpd='steady', # Category ... dtSpd=60.0, # Exact value ... ang='northeast', ... seed=50 ... ) ### Access time series: >>> current = env.Current1D(spd='typical', nIter=10, seed=0) >>> print(current.speed[:5]) [0.28 0.30 0.32 0.31 0.29] # Example values >>> print(current.angle[:5]) [0.85 0.87 0.88 0.86 0.84] # Example values ### Visualization: >>> # Shows speed and angle plots vs time >>> current = env.Current1D( ... spd='moderate', ... dSpd='varied', ... dtSpd='choppy', ... nIter=30000, ... seed=42 ... ) >>> current.display() ### Export as data object: >>> current = env.Current1D(spd='typical', seed=999) >>> data = current.getAsData() >>> print(type(data)) <class 'Current1DData'> >>> # Immutable dataclass for serialization """ ##-- Random Number Generator -----------------------------------------# if (seed is None): seed = np.random.SeedSequence().entropy self.seed = seed # RNG seed self._rng = np.random.default_rng(self.seed) # RNG instance ##-- Set Property Values ---------------------------------------------# self.n = nIter # Number of simulation iterations self.h = h # Time step of each iteration (s) # Speed Parameters self.v_spd = spd # Speed selected mean value (m/s) self.v_dv = dSpd # Speed deviation (+/-) (m/s) self.v_dt = dtSpd # Speed half period of deviations (s) # Angle Parameters self.b_ang = ang # Angle selected mean value (rad) self.b_db = dAng # Angle deviation (+/-) (rad) self.b_dt = dtAng # Angle half period of deviations (s) ##-- Construct Concrete Values ---------------------------------------# self._spd_rng_state = self._genRNGStartState(seed+1) self._rng.bit_generator.state = self._spd_rng_state self.speed = self.genSpeedList(self.v_spd, self.v_sdv, self.v_dt, self.n, self.h) self._ang_rng_state = self._genRNGStartState(seed+2) self._rng.bit_generator.state = self._ang_rng_state self.angle = self.genAngleList(self.b_ang, self.b_sdb, self.b_dt, self.n, self.h)
## Properties ============================================================# @property def n(self)->int: """Number of simulation iterations.""" return self._n @n.setter def n(self, nIter:int)->None: """Set number of iterations and resize data.""" if ('_n' in self.__dict__): if (nIter > self._n): self._rng.bit_generator.state = self._spd_rng_state self.speed = self.genSpeedList(self.v_spd, self.v_sdv, self.v_dt, nIter, self.h) self._rng.bit_generator.state = self._ang_rng_state self.angle = self.genAngleList(self.b_ang, self.b_sdb, self.b_dt, nIter, self.h) else: self.speed = self.speed[:nIter] self.angle = self.angle[:nIter] self._n = nIter #-------------------------------------------------------------------------- @property def h(self)->float: """Time step of each iteration (s).""" return self._h @h.setter def h(self, h:float)->None: """Set iteration time step and resample data.""" if ('_h' in self.__dict__): self._rng.bit_generator.state = self._spd_rng_state self.speed = self.genSpeedList(self.v_spd, self.v_sdv, self.v_dt, self.n, h) self._rng.bit_generator.state = self._ang_rng_state self.angle = self.genAngleList(self.b_ang, self.b_sdb, self.b_dt, self.n, h) self._h = h ## Speed Properties ======================================================# #-- Speed: Mean Value ----------------------------------------------------# @property def v_spd(self)->float: """Selected mean speed.""" return self._v_spd @v_spd.setter def v_spd(self, spd:Union[float, str, List[float]])->None: """Input automatically clamped to range [_V_LO,_V_HI].""" self._v_spd = self._set_v_spd(spd) #. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
[docs] @singledispatchmethod def _set_v_spd(self, spd:Union[int,float])->float: """Set speed value by dispatch based on input type.""" return self._clamp(spd,Current1D._V_LO,Current1D._V_HI)
[docs] @_set_v_spd.register(list) @_set_v_spd.register(str) def _set_v_spd_list(self, spd:Union[list,str])->float: """Set speed value from range.""" return self._selectFromZone(spd, 0.25, 'Speed', self.v_spdZones)
#-- Speed: Deviation -----------------------------------------------------# @property def v_dv(self)->float: """Mean deviation of selected mean speed value in terms of +/-.""" return self._v_dv @v_dv.setter def v_dv(self, dSpd:Union[float, str, List[float]])->None: """Input cannot exceed possible values range.""" self._v_sdv = None if (self.v_spd == 0): dSpd = 0.0 self._v_dv = self._set_v_dv(dSpd) #. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
[docs] @singledispatchmethod def _set_v_dv(self, dSpd:Union[int,float])->float: """Set speed deviation value by dispatch based on input type.""" upper = abs(Current1D._V_HI - self.v_spd) lower = abs(self.v_spd - Current1D._V_HI) return min(min(lower,upper), dSpd)
[docs] @_set_v_dv.register(list) @_set_v_dv.register(str) def _set_v_dv_list(self, dSpd:Union[list,str])->float: """Set speed deviation value from range.""" return self._selectFromZone(dSpd, 0.1, 'Delta Speed', self.v_dvZones)
#-- Speed: Time of Deviation ---------------------------------------------# @property def v_dt(self)->float: """Time of selected mean deviations from one extreme to next.""" return self._v_dt @v_dt.setter def v_dt(self, dtSpd:Union[float, str, List[float]])->None: """Set time of speed deviation.""" self._v_dt = self._set_v_dt(dtSpd) #. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
[docs] @singledispatchmethod def _set_v_dt(self, dtSpd:Union[int,float])->float: """Set time of speed deviation by dispatch based on input type.""" return dtSpd
[docs] @_set_v_dt.register(list) @_set_v_dt.register(str) def _set_v_dt_list(self, dtSpd:Union[list,str])->float: """Set time of speed deviation from range.""" return self._selectFromZone(dtSpd,60,'Delta Speed Rate',self.v_dtZones)
#-- Speed: Standard Deviation --------------------------------------------# @property def v_sdv(self)->float: """Auto-calculated. Standard deviation of mean speed deviations.""" if (self._v_sdv is None): self._v_sdv = self._calcSigma(self.v_spd, self.v_spd+self.v_dv) return self._v_sdv #-- Speed: Named Zones ---------------------------------------------------# @property def v_spdZones(self)->dict: """Named zones for speed values.""" if ('_v_spdZones' not in self.__dict__): self._v_spdZones = {'dead': [0.0,0.0], 'typical': [0.05,0.5], 'moderate': [0.5,1.0], 'swift': [1.0,1.8], 'fast': [1.8,2.5], 'any': [0.0,2.5]} return self._v_spdZones #. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . @property def v_dvZones(self)->dict: """Named zones for amount of deviation in speed values.""" if ('_v_dvZones' not in self.__dict__): self._v_dvZones = {'constant': [0.0,0.0], 'steady': [0.01,0.1], 'varied': [0.1,0.5], 'unsteady': [0.5,1.5], 'any': [0.0,1.5]} return self._v_dvZones #. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . @property def v_dtZones(self)->dict: """Named zones for rate of change in speed values.""" if ('_v_dtZones' not in self.__dict__): self._v_dtZones = {'calm': [600.0,600.0], 'smooth': [90.0,150.0], 'regular': [30.0,90.0], 'choppy': [10.0,30.0], 'any': [10.0,600.0]} return self._v_dtZones ## Angle Properties ======================================================# #-- Angle: Mean Value ----------------------------------------------------# @property def b_ang(self)->float: """Selected mean angle.""" return self._b_ang @b_ang.setter def b_ang(self, betaVal:Union[float, str, List[float]])->None: """Input reformatted to smallest-signed angle from (-pi,pi].""" self._b_ang = self._set_b_ang(betaVal) #. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
[docs] @singledispatchmethod def _set_b_ang(self, betaVal:Union[int,float])->float: """Set angle value by dispatch based on input type.""" return self._ssa(betaVal)
[docs] @_set_b_ang.register(list) @_set_b_ang.register(str) def _set_b_ang_list(self, betaVal:Union[list,str])->float: """Set angle value from range.""" return self._selectAngleFromZone(betaVal, np.pi/4, 'Angle', self.b_angZones)
#-- Angle: Deviation -----------------------------------------------------# @property def b_db(self)->float: """Mean deviation of selected mean angle value in terms of +/-.""" return self._b_db @b_db.setter def b_db(self, dAng:Union[float, str, List[float]])->None: """Input limited to positive value from (0, pi).""" self._b_sdb = None self._b_db = self._set_b_db(dAng) #. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
[docs] @singledispatchmethod def _set_b_db(self, dAng:Union[int,float])->float: """Set angle mean deviation value by dispatch based on input type.""" return abs(dAng) % (np.pi+1E-15)
[docs] @_set_b_db.register(list) @_set_b_db.register(str) def _set_b_db_list(self, dAng:Union[list,str])->float: """Set angle mean deviation from range.""" return self._selectAngleFromZone(dAng, np.pi/12, 'Delta Angle', self.b_dbZones)
#-- Angle: Time of Deviation ---------------------------------------------# @property def b_dt(self)->float: """Time of selected angle deviations from one extreme to next.""" return self._b_dt @b_dt.setter def b_dt(self, dtAng:Union[float, str, List[float]])->None: """Set time of angle deviation.""" self._b_dt = self._set_b_dt(dtAng) #. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
[docs] @singledispatchmethod def _set_b_dt(self, dtAng:Union[int,float])->float: """Set time of angle deviation by dispatch based on input type.""" return dtAng
[docs] @_set_b_dt.register(list) @_set_b_dt.register(str) def _set_b_dt_list(self, dtAng:Union[list,str])->float: """Set time of angle deviation from range.""" return self._selectFromZone(dtAng,60,'Delta Angle Rate',self.b_dtZones)
#-- Angle: Standard Deviation --------------------------------------------# @property def b_sdb(self)->float: """Auto-calculated. Standard deviation of mean angle deviation.""" if (self._b_sdb is None): self._b_sdb = self._calcAngleSigma(self.b_ang,self.b_ang+self.b_db) return self._b_sdb #-- Angle: Named Zones ---------------------------------------------------# @property def b_angZones(self)->dict: """Named zones for angle values.""" if ('_b_angZones' not in self.__dict__): self._b_angZones = {'east': [-np.pi/8,np.pi/8], 'northeast': [np.pi/8,3*np.pi/8], 'north': [3*np.pi/8,5*np.pi/8], 'northwest': [5*np.pi/8,7*np.pi/8], 'west': [7*np.pi/8,-7*np.pi/8], 'southwest': [-7*np.pi/8,-5*np.pi/8], 'south': [-5*np.pi/8,-3*np.pi/8], 'southeast': [-3*np.pi/8,-np.pi/8], 'any': [-np.pi,np.pi]} return self._b_angZones #. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . @property def b_dbZones(self)->dict: """Named zones for amount of deviation in angle values.""" if ('_b_dbZones' not in self.__dict__): self._b_dbZones = {'constant': [0.0,0.0], 'steady': [0.01,np.pi/18], # 0-10 'varied': [np.pi/18,np.pi/8], # 10-22.5 'unsteady': [np.pi/8,np.pi/2], # 22.5-90 'any': [0.0,np.pi/2]} # 0-90 return self._b_dbZones #. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . @property def b_dtZones(self)->dict: """Named zones for rate of change in angle values.""" if ('_b_dtZones' not in self.__dict__): self._b_dtZones = self.v_dtZones return self._b_dtZones ## Property Helper Methods ===============================================# # Numeric - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
[docs] @singledispatchmethod def _selectFromZone(self, unknown, default:float, zone:str, *args)->float: """ Select a value from a specific zone. Notes ----- - Valid input types: String, List (should be int or float). - Dispatch base case catches inputs without a registered format type. """ log.warning("Unsupported %s zone format: %s", zone, type(unknown)) self._printFallback(default) return default
[docs] @_selectFromZone.register(list) def _selectFromZoneList(self, input:list, default:float, zone:str, *args)->float: """Select random value from within listed range.""" if (len(input) != 2): self._printBadZoneList(input, zone, default) return default return self._selectFromLoHi(*input)
[docs] @_selectFromZone.register(str) def _selectFromZoneStr(self, input:str, default:float, zone:str, zone_dict)->float: """Select random value from within named range.""" if (input not in zone_dict): self._printBadZoneName(input, zone_dict.keys(), zone, default) return default return self._selectFromLoHi(*zone_dict[input])
# Angle - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
[docs] @singledispatchmethod def _selectAngleFromZone(self, unknown, default:float, zone:str, *args)->None: """ Select an angle from a specific zone. Notes ----- - Valid input types: String, List (should be int or float). - Dispatch base case catches inputs without a registered format type. """ log.warning("Unsupported %s zone format: %s", zone, type(unknown)) self._printFallback(default) return default
[docs] @_selectAngleFromZone.register(list) def _selectAngleFromZoneList(self, input:list, default:float, zone:str, *args)->float: """Select random value from within listed range.""" if (len(input) != 2): self._printBadZoneList(input, zone, default) return default return self._selectAngleFromLoHi(*input)
[docs] @_selectAngleFromZone.register(str) def _selectAngleFromZoneStr(self, input:str, default:float, zone:str, zone_dict)->float: """Select random value from within named range.""" if (input not in zone_dict): self._printBadZoneName(input, zone_dict.keys(), zone, default) return default return self._selectAngleFromLoHi(*zone_dict[input])
## Special Methods =======================================================#
[docs] def __repr__(self) -> str: """Detailed description of Current1D.""" fmt = '.3f' zones = self._buildDescription() out = [ f"n={self.n}", f"h={self.h}", f"seed={self.seed}", f"v_spd={self.v_spd:{fmt}} ('{zones['spd']}')", f"v_dv={self.v_dv:{fmt}} ('{zones['dSpd']}')", f"v_dt={self.v_dt:{fmt}} ('{zones['dtSpd']}')", f"b_ang={self.b_ang:{fmt}} ('{zones['ang']}')", f"b_db={self.b_db:{fmt}} ('{zones['dAng']}')", f"b_dt={self.b_dt:{fmt}} ('{zones['dtAng']}')" ] return f"{self.__class__.__name__}(" + ", ".join(out) + ")"
#-------------------------------------------------------------------------#
[docs] def __str__(self) -> str: """User friendly description of Current1D.""" zones = self._buildDescription() fmt = '.3f' cw = 16 return ( f"Current\n" f"{' Speed:':{cw}} {self.v_spd:{fmt}} +/-{self.v_dv:{fmt}} m/s " f"({self.v_dt:{fmt}}s)\n" f"{' Angle:':{cw}} {self.b_ang:{fmt}} +/-{self.b_db:{fmt}} rad " f"({self.b_dt:{fmt}}s)\n" f"{' Seed:':{cw}} {self.seed}\n" )
## Methods ===============================================================#
[docs] def genSpeedList(self, mean:float, stddev:float, dt:float, n:int, h:float=0.02)->NPFltArr: """ Generate list of speed values that fluctuate around mean. Parameters ---------- mean : float Mean speed in m/s. stddev : float Standard deviation for amplitude sampling. dt : float Half-period in seconds. n : int Number of samples. h : float, default=0.02 Time step in seconds. Returns ------- speed : ndarray, shape (n,) Speed time series clipped to [_V_LO, _V_HI]. """ return np.clip(self._genValueList(mean, stddev, dt, n, h), Current1D._V_LO, Current1D._V_HI)
#-------------------------------------------------------------------------#
[docs] def genAngleList(self, mean:float, stddev:float, dt:float, n:int, h:float=0.02)->NPFltArr: """ Generate list of angle values that fluctuate around mean. Parameters ---------- mean : float Mean direction in radians. stddev : float Standard deviation for amplitude sampling. dt : float Half-period in seconds. n : int Number of samples. h : float, default=0.02 Time step in seconds. Returns ------- angle : ndarray, shape (n,) Direction time series wrapped to [-pi, pi]. """ return self._ssa(self._genValueList(mean, stddev, dt, n, h))
#-------------------------------------------------------------------------#
[docs] def display(self)->None: """Display simple plot to view attribute data arrays.""" speed_s = (f"speed: {self.v_spd:.3} m/s, +/- {self.v_dv:.3}, " + f"{self.v_dt:.0f} s") angle_s = (f"angle: {self.b_ang:.3} rad, +/- {self.b_db:.3}, " + f"{self.b_dt:.0f} s") line = "-"*36 log.info("%s", line) log.info("%s", speed_s) log.info("%s", angle_s) log.info("%s", line) xvals = [x*self.h for x in range(0,len(self.speed))] plt.figure(figsize=(8,9)) plt.subplot(211,title=speed_s) plt.plot(xvals,self.speed) plt.ylabel('(m/s)') plt.grid() plt.subplot(212,title=angle_s) plt.plot(xvals,self.angle) plt.ylabel('(rad)') plt.xlabel('(s)') plt.grid() plt.show()
#-------------------------------------------------------------------------#
[docs] def getAsData(self)->Current1DData: """ Convert this Current1D object to Current1DData object. Returns ------- data : Current1DData Immutable dataclass with all current parameters and arrays. """ return Current1DData(self.speed, self.angle, self.v_spd, self.v_dv, self.v_dt, self.b_ang, self.b_db, self.b_dt, self.n, self.h, self.seed)
## Static Methods ========================================================#
[docs] @staticmethod def genAsData(spd:Union[float, str, List[float]] = 'typical', dSpd:Union[float, str, List[float]] = 'steady', dtSpd:Union[float, str, List[float]] = 'regular', ang:Union[float, str, List[float]] = 'northeast', dAng:Union[float, str, List[float]] = 'steady', dtAng:Union[float, str, List[float]] = 'regular', nIter:int = 50000, h:float = 0.02, seed:Optional[int] = None, )->Current1DData: """ Create a Current1DData object using the Current1D class. Static method convenience constructor for data object. Parameters ---------- Same as Current1D.__init__ Returns ------- data : Current1DData Immutable dataclass with generated current. """ c = Current1D(spd,dSpd,dtSpd,ang,dAng,dtAng,nIter,h,seed=seed) return Current1DData(c.speed, c.angle, c.v_spd, c.v_dv, c.v_dt, c.b_ang, c.b_db, c.b_dt, c.n, c.h, c.seed)
## Helper Methods ========================================================#
[docs] def _clamp(self, val:float, minVal:float, maxVal:float)->float: """Limit value to within specified range.""" return max(min(maxVal, val), minVal)
#-------------------------------------------------------------------------#
[docs] def _calcSigma(self, mu:float, hi:float, ci:Optional[float]=None)->float: """Return standard devation, based on 'hi' within 'ci' sigma of 'mu'.""" if (ci is None): ci = Current1D._CI return abs(hi - mu) / ci
#-------------------------------------------------------------------------#
[docs] def _selectRandomN(self, mean:float, stddev:float)->float: """Return a value from a normal distribution.""" return self._rng.normal(mean, stddev)
#-------------------------------------------------------------------------#
[docs] def _getOffset(self, mean:float, stddev:float)->float: """Return an offset from the mean inside of a normal distribution.""" val = self._selectRandomN(mean, stddev) return abs(mean-val)
#-------------------------------------------------------------------------#
[docs] def _selectRandomU(self, lo:float, hi:float)->float: """Return a value from a uniform distribution.""" return self._rng.uniform(lo, hi)
#-------------------------------------------------------------------------#
[docs] def _selectFromLoHi(self, lo:float, hi:float)->float: """Return a random value from distribution between lo and hi.""" return self._selectRandomU(lo, hi)
#-------------------------------------------------------------------------#
[docs] def _ssa(self, angle:float)->float: """Convert angle to smallest-signed angle in [-pi,pi).""" return (np.asarray(angle) + np.pi) % (2 * np.pi) - np.pi
#-------------------------------------------------------------------------#
[docs] def _subtractSSA(self, minuend:float, subtrahend:float)->float: """Calculate size of arc from subtrahend to minuend.""" if (subtrahend > minuend): return (minuend + (2*np.pi)) - subtrahend return minuend - subtrahend
#-------------------------------------------------------------------------#
[docs] def _calcAngleSigma(self, mu:float, hi:float, ci:Optional[float]=None, )->float: """Return standard devation, based on 'hi' within 'ci' sigma of 'mu'.""" if (ci is None): ci = Current1D._CI return self._subtractSSA(hi, mu) / ci
#-------------------------------------------------------------------------#
[docs] def _selectAngleFromLoHi(self, lo:float, hi:float)->float: """Return a random angle from distribution between lo and hi.""" hi = lo + self._subtractSSA(hi,lo) return self._ssa(self._selectRandomU(lo,hi))
#-------------------------------------------------------------------------#
[docs] def _printFallback(self, default:float)->None: """Print message indicating default fallback value being used.""" log.warning("Using fallback value: %s", default)
#-------------------------------------------------------------------------#
[docs] def _printBadZoneList(self, input:Union[List[int],List[float]], zone:str, default:float)->None: """Print message if invalid range given for list zone.""" log.warning("%s is not a valid range for %s.", input, zone) log.warning("Range must have two elements: [Low, High]") self._printFallback(default)
#-------------------------------------------------------------------------#
[docs] def _printBadZoneName(self, input:str, keys:KeysView, zone:str, default:float)->None: """Print message if invalid option given for named zone.""" options = "" for option in keys: options += f" {option}" log.warning("'%s' is not a valid zone for %s.", input, zone) log.warning("(Valid options: %s)", options) self._printFallback(default)
#-------------------------------------------------------------------------#
[docs] def _buildDescription(self)->Dict[str,str]: """Make a dictionary of all named zones that inputs fall into.""" description = {} #Speed for k,v in self.v_spdZones.items(): if ((self.v_spd >= v[0]) and (self.v_spd <= v[1]) and (k != 'any')): description['spd'] = k #dSpeed for k,v in self.v_dvZones.items(): if ((self.v_dv >= v[0]) and (self.v_dv <= v[1]) and (k != 'any')): description['dSpd'] = k #dtSpeed for k,v in self.v_dtZones.items(): if ((self.v_dt >= v[0]) and (self.v_dt <= v[1]) and (k != 'any')): description['dtSpd'] = k #Angle for k,v in self.b_angZones.items(): if ((self.b_ang >= v[0]) and (self.b_ang <= v[1]) and (k != 'any')): description['ang'] = k #dAngle for k,v in self.b_dbZones.items(): if ((self.b_db >= v[0]) and (self.b_db <= v[1]) and (k != 'any')): description['dAng'] = k #dtAngle for k,v in self.b_dtZones.items(): if ((self.b_dt >= v[0]) and (self.b_dt <= v[1]) and (k != 'any')): description['dtAng'] = k return description
#-------------------------------------------------------------------------#
[docs] def _genRNGStartState(self, seed:int)->None: """Return a numpy bit generator state from a seed.""" return np.random.default_rng(seed).bit_generator.state
#-------------------------------------------------------------------------#
[docs] def _genValueList(self, mean:float, stddev:float, dt:float, n:int, h:float)->NPFltArr: """Return a list of values that fluctuate around the mean.""" # Make empty list vals = [] # Choose random starting direction start = self._rng.choice([1,-1]) # Make half-period oscillation array n_halfT = int(dt/h) oscillation = start * np.sin(np.linspace(0, np.pi, n_halfT)) # Starting amplitude, as offset from mean a_0 = self._getOffset(mean, stddev) for i in range(-(n // -n_halfT)): """This bizarre expression in range() does 'upside-down floor division', and that bizarre statement means I'm doing ceiling division in the negative numbers. The reason for this is to perform the ceil() operation without a function call and to return an integer. Fun! The point being to run the loop enough times to fill the speed list array with n or more elements.""" # Flip direction flip = (-1.0)**i # Get next amplitude, as offset from mean a_1 = self._getOffset(mean, stddev) # Make amplitude array a = np.linspace(a_0, a_1, n_halfT) # Store amplitude a_0 = a_1 # Generate oscillations from mean for half period vals.extend(mean + (a * (flip * oscillation))) return np.array(vals[:n])
#-------------------------------------------------------------------------#
[docs] def _genValue(self, mean:float, stddev:float, dt:float, n:int=np.inf, h:float=0.02)->Generator: """Generator function that yields values rather than a complete list.""" # Choose random starting direction start = self._rng.choice([1,-1]) # Calculate iteration angular frequency n_halfT = int(dt/h) omega_h = start * (np.pi / n_halfT) # Get amplitude generator, as offset from mean a_0 = self._getOffset(mean, stddev) a_1 = self._getOffset(mean, stddev) step = (a_1-a_0) / (n_halfT-1) # Generate oscillations i = 0 while i <= n: a = a_0 + step*(i % n_halfT) yield mean + ((a) * np.sin(omega_h*(i))) i += 1 if (i % (n_halfT) == 0): # Get next amplitude a_0 = a_1 a_1 = self._getOffset(mean, stddev) step = (a_1-a_0) / (n_halfT-1)
###############################################################################
[docs]class Floor: """ Ocean floor depth map generated from 2D Perlin noise. Procedurally generates realistic ocean floor terrain using Perlin noise algorithms. Provides smooth, continuous depth variations suitable for AUV navigation, sensor simulation, and collision detection. Parameters ---------- z : float, default=125 Shallowest floor depth in meters (minimum z value in depth array). Represents highest point of terrain relative to sea surface. z_range : float, default=10 Vertical range of depth variation in meters. Maximum depth = z + z_range. size : int, default=1000 Side length of square floor area in meters. Generates size x size depth array. origin : list of float, default=[500, 500] Coordinates [x, y] where (0, 0) maps to floor array center. style : str, default='linear' Depth mapping style. Determines how the depth array is constructed. 'flat' produces uniform depth without Perlin noise; all other styles map Perlin noise values onto z_range. **kwargs Additional optional keyword arugments. **Perlin noise parameters** - seed : int - Same seed produces idential terrain (default: 0) - random : bool - True generates random seed (default: False) - scale : int - Spatial frequency scale (default: 0.3*size) - octaves : int - Number of noise layers (default: 3) - persistence : float - Amplitude decay factor (default: 0.5) See PerlinNoise documentation for detailed parameter descriptions. Attributes ---------- z : float Minimum floor depth (shallowest) in meters. z_range : float Depth variation range (max - min) in meters. size : int Floor array side length in meters. origin : list of float Origin coordinates [x, y] for indexing. style : str Terrain generation style. depth : ndarray, shape (size, size) 2D array of floor depths in meters. perlin : PerlinNoise Perlin noise generator instance. Methods ------- __call__(x, y) : Floor instance is callable: query floor depth at (x, y) position. samplePoints(x, y) : Sample depth values at list of points. sampleGrid(x, y) : Sample depth at grid of points. sampleRegion(x, y) : Sample complete subregion of depth map. createMap(style, kwargs) : Create a depth map from 2D array using specified style. xy2Index(x, y) : Convert END coordinates (x, y) to array indices [j, i]. generate() : Precompute and cache the full depth array. display2D(z, dispType, path) : Display a 2D image of the depth array. display3D(z) : Display a 3D plot of the depth array. Notes ----- **Terrain Generation:** - **Perlin Noise Properties:** - Continuous: No discontinuities or sharp edges - Coherent: Nearby points have similar values - Multi-scale: Octaves add detail at different frequencies - Deterministic: Same seed produces identical terrain - Flexible evaluation: Supports both point-wise and vectorized queries - **Style Strategy:** Floor uses a strategy pattern to select depth mapping algorithms. The `style` parameter determines which mapping function is applied: - 'flat': Uniform depth array (ignores Perlin noise) - 'linear': Linear stretch over z range - 'sigmoid': Smooth transitions via sigmoid function - 'shelf': Asymmetric scaling for shelf-like features Future styles can be added by implementing new `_map*()` methods and registering in `_styleStrategies` dictionary. - **Lazy Tile-Based Generation:** The depth array is not precomputed during construction. Instead, the floor uses lazy evaluation with tile-based caching: - Array divided into square tiles - Tiles generated on-demand when first accessed via queries - Generated tiles are cached for subsequent access - Full array is only built when explicitly accessed via `depth` property **Coordinate System:** Floor uses END (East-North-Down) convention: - x: East coordinate (meters) - y: North coordinate (meters) - z: Depth below surface (positive down) **Callable Interface:** Floor objects are callable for convenient depth queries: >>> floor = Floor(z=100, z_range=20) >>> depth_at_origin = floor(0, 0) >>> depth_at_point = floor(200, 300) **Indexing and Origin:** The origin parameter allows coordinate systems with negative values. >>> floor = Floor(size=1000, origin=[500, 500]) >>> # Query at (0, 0) maps to array index [500, 500] >>> # Query at (-250, 300) maps to array index [250, 800] **Boundary Behavior:** Queries outside [0, size] range return valid values. The indexing is allowed to wrap around and return values at [x mod size]. This prevents stopping a simulation run due to bad indexing and effectively treats the floor map as though it were wrap-around. >>> floor = Floor(size=1000) >>> d = floor(1100, 500) # Outside bounds >>> floor(1100, 500) == floor(100, 500) # 1100 % 1000 = 100 True **Resampling:** Changing z, z_range, size, style, or perlin instance invalidates the tile cache, causing any previously generated depth terrain to require regeneration. >>> floor = Floor(size=1000) >>> floor.size = 2000 # Clears cache, regenerates terrain at new size Examples -------- ### Create default ocean floor: >>> floor = Floor() >>> print(f"Depth range: {floor.z}-{floor.z + floor.z_range} m") Depth range: 125-135 m >>> print(f"Area: {floor.size}m x {floor.size}m") Area: 1000m x 1000m ### Custom terrain parameters: >>> floor = Floor( ... z=80, # Shallowest at 80m ... z_range=40, # Varies 80-120m ... size=2000, # 2km x 2km area ... origin=[1000, 750], # x: [-1000 ... 999], y: [-750 ... 1249] ... seed=123, # Seed value for reproducible noise ... style='sigmoid' # S-curve mapping from noise to depth ... ) ### Query single point: >>> floor = Floor(randomFloor=True) >>> depth = floor(250, 300) # At (250m E, 300m N) >>> print(f"Floor depth: {depth:.2f} m") ### Query multiple points: >>> x_coords = [0, 100, 200, 300] >>> y_coords = [0, 100, 200, 300] >>> depths = floor.samplePoints(x_coords, y_coords) >>> print(depths) [132.89879249 130.43546153 127.1800901 130.33548963] >>> >>> depth_grid = floor.sampleGrid(x_coords, y_coords) >>> print(depth_grid) [[132.89879249 129.99690006 130.73963137 130.51609651] [131.90777505 130.43546153 129.22941748 128.60290109] [132.39564569 130.67040518 127.1800901 131.69874289] [130.73963137 131.23426993 129.85516039 130.33548963]] ### Visualize terrain: >>> floor.display2D() >>> floor.display3D() ### Different terrain styles: >>> # Linear (default) >>> floor = Floor(z=100, z_range=30) >>> >>> # Sigmoid >>> floor_sigmoid = Floor(z=100, z_range=30, style='sigmoid') >>> # Customized sigmoid >>> sigmoid_map = floor_sigmoid.createMap(k=10.0) >>> >>> # Shelf, updated from style >>> floor.style = 'shelf' # resets depth with default k >>> # Customize and save to depth >>> floor.depth = floor.createMap(k=5.0) See Also -------- PerlinNoise : 2D Perlin noise generator used internally Ocean : Container class that manages Floor instance navigation.OceanDepthSensor : Sensor that samples floor depth Warnings -------- - Queries outside floor bounds wrap around the array """ ## Constructor ===========================================================#
[docs] def __init__(self, z:Number=125, z_range:Number=10, size:int=1000, origin:List[float]=[500,500], style:str='linear', **kwargs ): """ Construct ocean floor with procedurally generated terrain from Perlin noise. Creates a realistic bathymetric depth map using Perlin noise generation with specified depth range, spatial extent, and terrain characteristics. Supports deterministic (seeded) or randomized terrain generation. Parameters ---------- z : float, default=125 Shallowest floor depth in meters (minimum depth in generated map). Represents the highest point of seafloor relative to surface (z=0). Typical values: - Shallow coastal: 10-50m - Continental shelf: 50-200m - Slope regions: 200-1000m - Deep ocean: >1000m Must be non-negative. No upper limit enforced. z_range : float, default=10 Vertical range of depth variation in meters. Maximum depth = z + z_range. Controls terrain "roughness": - Smooth terrain: 5-15m - Moderate terrain: 15-30m - Rough terrain: 30-50m - Extreme terrain: >50m Must be non-negative. Typical: 10-30m for coastal AUV operations. size : int, default=1000 Side length of square floor area in meters. Generates size * size depth array (square domain only). Typical values: - Small test area: 500m - Standard operations: 1000-2000m - Large survey: 3000-10000m Must be positive integer. origin : list of float, default=[500, 500] Coordinates [x_origin, y_origin] where (x=0, y=0) maps in depth array. Allows coordinate systems with negative values: - [500, 500]: (0,0) at array center for 1000m size - [0, 0]: (0,0) at array corner (all positive coords) - [1000, 1000]: Allows negative coords from -1000 to 0 Typically set to [size/2, size/2] for centered operations. style : str, default='linear' Terrain generation style (how Perlin noise maps to depth). Available styles supported: - 'flat': uniform depth with no variation (ignores Perlin noise) - 'linear': linear scaling across z_range - 'sigmoid': smooth s-curve transitions - 'shelf': asymmetric scaling Possible future styles: - 'exponential': For canyons/trenches - 'bimodal': For plateaus and valleys - 'terraced': For step-like formations Invalid style logs warning and fallback to default. **kwargs Additional optional keyword arguments. **Perlin noise parameters** - seed : int - Same seed produces idential terrain (default: 0) - random : bool - True generates random seed (default: False) - scale : int - Spatial frequency scale (default: 0.3*size) - octaves : int - Number of noise layers (default: 3) - persistence : float - Amplitude decay factor (default: 0.5) See PerlinNoise documentation for detailed parameter descriptions. Attributes ---------- z : float Minimum depth (stored, invalidates depth cache on change). z_range : float Depth variation range (stored, invalidates depth cache on change). size : int Floor area dimension (stored, triggers regeneration on change). origin : list of float Origin coordinates (stored directly). style : str Terrain style (stored, triggers PerlinNoise and method assignment). depth : ndarray, shape (size, size) Final depth map in meters. Only fully computed on explicit access. perlin : PerlinNoise Perlin noise generator instance containing: - noise: Normalized 2D array [0, 1] - scale, octaves, persistence, seed: Generation parameters Set to None if style is 'flat'. Notes ----- **Generation Process** 1. **Parameter Storage:** Store z, z_range, size, origin, style directly as managed properties or attributes. 2. **Perlin Noise Generation:** Create PerlinNoise instance: >>> self.perlin = PerlinNoise( ... size=size, ... **perlinKwargs # scale, octaves, persistence, seed, random ... ) This generates normalized noise array in [0, 1] on-demand. 3. **Style Strategy Assignment:** Validates style parameter and initializes strategy dictionary: >>> self._styleStrategies = { ... 'flat': self._mapFlat, ... 'linear': self._mapLinear, ... 'sigmoid': self._mapSigmoid, ... 'shelf': self._mapShelf ... } >>> self._style = self._validateStyle(style) Invalid style falls back to 'linear' with warning. 4. **Lazy Evaluation Ready:** No depth values are computed during initialization. The Floor instance is ready to generate and cache regions of the depth array on-demand: - Queries by `__call__()`, `samplePoints()`, `sampleGrid()`, etc. - Full array accessed via `depth` property. - Explicitly generated in full with `generate()` method. **Managed Property Behavior** `z`, `z_range`, `size`, `style`, and `perlin` are managed properties. Setting any of them at runtime has side effects: - `z`, `z_range`: Invalidate the depth cache; depth is recomputed on next access. - `size`: Rebuilds the Perlin instance (preserving seed and noise parameters) at the new size, which in turn resets the depth cache. For `style='flat'`, only `_size` is updated. - `style`: Validates and assigns the new style, resets the depth cache, and clears or assigns the PerlinNoise instance. - `perlin`: Syncs `_tileSize` and resets the depth cache. **Style Strategy Pattern** The style setter builds the strategy dictionary and if new assignment is made, clears the depth array cache. **Adding New Styles:** 1. Implement new `_map*()` method with signature: `_map*(self, noise=None, z=None, z_range=None, **kwargs) -> NPFltArr` 2. Register in `style.setter`: >>> self._styleStrategies = { ... 'flat': self._mapFlat, ... 'linear': self._mapLinear, ... 'sigmoid': self._mapSigmoid, ... 'shelf': self._mapShelf, ... 'newstyle': self._mapNewStyle # Add here ... } 3. Update `createMap()` and `_validateStyle()` docstrings with new option. **Integration with Ocean:** Ocean constructor passes parameters: >>> ocean = Ocean( ... size=2000, # Passed to Floor as size ... origin=[1000, 1000], # Passed to Floor as origin ... z=150, # Passed to Floor as z ... z_range=25, # Passed to Floor as z_range ... floorSeed=123, # Passed as seed ... randomFloor=False # Passed as random ... ) >>> ocean.floor.size 2000 See Also -------- PerlinNoise.__init__ : Noise generation parameters and process Floor.createMap : Converts noise array to depth array Ocean.__init__ : Creates Floor with oceanographic parameters Examples -------- ### Minimal initialization: >>> import munetauvsim.environment as env >>> floor = env.Floor() >>> print(f"Depth range: {floor.z} to {floor.z + floor.z_range} m") Depth range: 125 to 135 m >>> print(f"Area: {floor.size} m") Area: 1000 m ### Custom shallow coastal terrain: >>> floor = env.Floor( ... z=30, # 30m shallowest ... z_range=20, # Varies 30-50m ... size=2000, # 2km * 2km ... origin=[1000, 1000], ... seed=42, ... style='shelf', # Flat low areas with steep raised regions ... scale=4000, # Broader perlin noise features ... octaves=5, # More perlin noise detail layers ... persistence=0.6 # Rougher perlin noise terrain ... ) >>> floor.display3D() ### Random terrain for testing: >>> floor = env.Floor( ... z=100, ... z_range=30, ... random=True # Unique terrain each run ... ) >>> print(f"Generated with seed: {floor.perlin.seed}") ### Access Perlin noise directly: >>> floor = env.Floor(seed=42) >>> noise = floor.perlin.noise # Raw Perlin [0, 1] >>> print(f"Noise range: [{noise.min():.3f}, {noise.max():.3f}]") Noise range: [0.000, 1.000] >>> # Manually transform >>> custom_depth = noise * 50 + 100 # 100-150m range ### Query depth after creation: >>> floor = env.Floor(z=120, z_range=20) >>> depth_at_origin = floor(0, 0) >>> print(f"Depth at (0, 0): {depth_at_origin:.2f} m") >>> # Sample multiple points >>> depths = floor.samplePoints([0, 100, 200], [0, 100, 200]) >>> print(f"Depths: {depths}") """ # Floor parameters self.z = z self.z_range = z_range self.size = size self.origin = origin # Perlin parameters self._perlinKwargs = { # Default values 'seed': 0, 'scale': 0.3 * self.size, 'octaves': 3, 'persistence': 0.5, 'random': False, # Overwrite defaults if parameters passed in kwargs **{k:v for k,v in kwargs.items() if k in {'seed', 'scale', 'octaves', 'persistence', 'random'}} } # Depth array parameters self.style = style
## Properties ============================================================# @property def depth(self)->NPFltArr: """ Full depth array with lazy precomputation. On first access, builds full array from cached/generated tiles. Subsequent access returns cached array (instant). Notes ----- **Simulation queries** (e.g. `floor(x,y)`) do not use this property. They use tile cache directly. **Visualization queries** (e.g. `display3D()`) trigger this property. """ if self._depth is None: log.info(f"Computing full depth array ({self.size}x{self.size})") self._depth = self._buildFullDepth() return self._depth @depth.setter def depth(self, value: NPFltArr)->None: """Assign depth array directly and invalidate prior tile cache.""" self._resetDepthCache() self._depth = value #-------------------------------------------------------------------------- @property def z(self)->Number: """Minimum floor depth in meters.""" return self._z @z.setter def z(self, z:Number)->None: """Set minimum floor depth and invalidate depth cache.""" self._resetDepthCache() self._z = z #-------------------------------------------------------------------------- @property def z_range(self)->Number: """Depth range in meters. Maximum depth is `z + z_range`.""" return self._z_range @z_range.setter def z_range(self, z_range:Number)->None: """Set depth range and invalidate depth cache.""" self._resetDepthCache() self._z_range = z_range #-------------------------------------------------------------------------- @property def size(self)->int: """Length of one side of floor area (m).""" return self._size @size.setter def size(self, size:int)->None: """Set size and regenerate Perlin with new size. Assumes same origin.""" if (('_style' in self.__dict__) and (self._style != 'flat') and ('_perlin' in self.__dict__) and (self._perlin is not None)): # Use perlin property setter for automatic sync self.perlin = PerlinNoise(size=size, scale=self.perlin.scale, octaves=self.perlin.octaves, persistence=self.perlin.persistence, seed=self.perlin.seed, random=False) self._size = size #-------------------------------------------------------------------------- @property def style(self)->str: """Defines how noise array is scaled to create depth map.""" return self._style @style.setter def style(self, style:str)->None: """Set depth mapping style, invalidate depth cache, and assign Perlin""" # Define available strategies self._styleStrategies = { 'flat': self._mapFlat, 'linear': self._mapLinear, 'sigmoid': self._mapSigmoid, 'shelf': self._mapShelf, } # Validate and assign input style = self._validateStyle(style) self._style = style # Set PerlinNoise instance ## Note: Style change requires call to _resetDepthCache, but both of ## these branches call setters on managed properties and each of them ## independently resets the depth cache. No explicit call needed here. if (style == 'flat'): self._z_range_orig = self.z_range # cache original for later reuse self.z_range = 0 self.perlin = None else: if (self.z_range == 0): self.z_range = getattr(self, '_z_range_orig', 10) # reuse orig self.perlin = PerlinNoise(size=self.size, **self._perlinKwargs) #-------------------------------------------------------------------------- @property def perlin(self)->'PerlinNoise': """PerlinNoise generator for ocean floor terrain.""" return self._perlin @perlin.setter def perlin(self, pn:Optional['PerlinNoise'])->None: """Set PerlinNoise, sync tile size, and invalidate depth cache.""" self._perlin = pn self._tileSize = pn._tileSize if pn else self.size self._resetDepthCache() ## Special Methods =======================================================#
[docs] def __call__(self, x:Number, y:Number)->np.float64: """ Return depth value at (x,y) coordinates. Parameters ---------- x, y : float Coordinate point to sample. Returns ------- z : float Depth value at (x,y) in meters. Notes ----- - Uses tile cache for lazy evaluation. Tiles are generated on-demand and cached for efficiency. First query within a tile triggers depth array generation for that region, with results cached for subsequent queries. - Short-circuits for the 'flat' style by delegating to `_getFlatZ()`. Examples -------- >>> floor = Floor() # New default ocean floor object >>> z = floor(100,100) # Depth value at (x=100,y=100) """ # Check depth style if (self.style == 'flat'): return self._getFlatZ() # Convert to array indices col, row = self.xy2Index(x, y) # Clamp to valid range col = int(col) % self.size row = int(row) % self.size # Determine tile containing position tileI = col // self._tileSize tileJ = row // self._tileSize # Get cached or generate tile tile = self._getTile(tileI, tileJ) # Index within tile colLocal = col % self._tileSize rowLocal = row % self._tileSize return np.float64(tile[rowLocal, colLocal])
#--------------------------------------------------------------------------
[docs] def __repr__(self)->str: """Detailed description of Floor.""" perlin = f"{self.perlin!r}" if (self._perlin is not None) else "None" return ( f"{self.__class__.__name__}(" f"z={self.z}, " f"z_range={self.z_range}, " f"size={self.size}, " f"origin={self.origin}, " f"style='{self.style}', " f"perlin={perlin})" )
#--------------------------------------------------------------------------
[docs] def __str__(self)->str: """User friendly description of Floor.""" cw = 16 perlin = (f"{self.perlin}" if (self._perlin is not None) else f"{'Perlin':{cw}} None\n") return ( f"Floor\n" f"{' Size:':{cw}} {self.size} m\n" f"{' Origin:':{cw}} {self.origin}\n" f"{' Depth:':{cw}} {self.z} to {self.z + self.z_range} m\n" f"{' Style:':{cw}} {self.style}\n" f"\n{perlin}" )
## Methods ===============================================================#
[docs] def samplePoints(self, x:Union[List, NPFltArr], y:Union[List, NPFltArr] )->np.ndarray: """ Sample floor depth at list of specific coordinate points. Queries multiple (x,y) coordinates and returns corresponding depths. Efficient for sparse point sampling. Uses tile cache. Parameters ---------- x : list or ndarray Lists of x-coordinates [x0,...,xi]. y : list or ndarray Lists of y-coordinates [y0,...,yi]. Returns ------- z : ndarray 1D array of depth values at each point (xi,yi). Notes ----- **Tile-Based Lookup** Queries tile cache for each point. Tiles are generated on-demand and cached for future queries. **Out-of-Bounds Wrapping** Coordinates outside [0, size) wrap via modulo indexing. >>> floor = Floor(size = 1000) >>> z1 = floor(1100, 500) # Same as floor(100, 500) **Short Circuit:** If `style='flat'`, then the method short-circuits by delegating to `_mapFlat()`. Examples -------- >>> floor = Floor() >>> x = [100, 200, 300] >>> y = [150, 250, 350] >>> z = floor.samplePoints(x, y) # z is list of length 3 """ # Check depth style if (self.style == 'flat'): return self._mapFlat(np.asarray(x)) # Convert to indices x = np.asarray(x) y = np.asarray(y) col, row = self.xy2Index(x, y) # Determine tiles for each point iTiles = col // self._tileSize jTiles = row // self._tileSize # Collect unique tiles uniqueTiles = set(zip(iTiles, jTiles)) # Compute local indices colLocal = col % self._tileSize rowLocal = row % self._tileSize # Batch-process points by using mask to select points in current tile result = np.empty(x.shape, dtype=np.float64) for tileI, tileJ in uniqueTiles: tile = self._getTile(tileI, tileJ) mask = (iTiles == tileI) & (jTiles == tileJ) result[mask] = tile[rowLocal[mask], colLocal[mask]] return result
#--------------------------------------------------------------------------
[docs] def sampleGrid(self, x:Union[List, np.ndarray], y:Union[List, np.ndarray] )->np.ndarray: """ Sample floor depth at grid of coordinate points. Queries a regular Cartesian grid of (x,y) coordinates. Returns a 2D array with shape (len(y), len(x)). Optimized for uniform grids but handles irregular grids. Parameters ---------- x : list or ndarray X-coordinates [x0,...,xi] for grid point columns. Grid is formed by the Cartesian product (xi, yj). y : list or ndarray Y-coordinates [y0,...,yj] for grid point rows. Grid is formed by the Cartesian product (xi, yj). Returns ------- z : ndarray, shape (len(y), len(x)) 2D array of depth values at each grid point (xi,yj). z[j, i] = floor(x[i], y[j]). Notes ----- **Cartesian Product Convention:** For Cartesian product of x-coordinates and y-coordinates, standard numpy convention returns shape [len(y), len(x)]. This matches meshgrid output with default indexing='xy'. **Optimization Requirements:** To leverage fast array computations, the method requires: 1. All coordinates are within floor bounds (no wrapping) 2. Monotonically increasing order (in sorted order) 3. Uniform spacing (constant differences) If all requirements are met, a faster computation is used that relies on the sampleRegion() method. If any requirement fails, a slower computation is used that relies on the samplePoints() method. **Short Circuit:** If `style='flat'`, then the method short-circuits by delegating to `_mapFlat()`, passing a shaped empty array to ensure the output has the correct shape. Examples -------- >>> floor = Floor() >>> x = [100, 200, 300] >>> y = [150, 250, 350] >>> z = floor.sampleGrid(x, y) # z is array (3,3) """ # Check depth style if (self.style == 'flat'): return self._mapFlat(np.empty((len(y), len(x)))) # Check for coordinates out-of-bounds xArr, yArr = np.asarray(x), np.asarray(y) colRaw = (xArr + self.origin[0]).astype(int) rowRaw = (yArr + self.origin[1]).astype(int) inBounds = (np.all((colRaw >= 0) & (colRaw < self.size)) and np.all((rowRaw >= 0) & (rowRaw < self.size))) # Convert to indices col, row = self.xy2Index(xArr, yArr) # If in bounds, check if points form a uniform sampling region useOptimized = False if (inBounds): colDiff = np.diff(col) rowDiff = np.diff(row) colUniform = ((len(col) > 1) and (colDiff[0] > 0) and np.all(colDiff == colDiff[0])) rowUniform = ((len(row) > 1) and (rowDiff[0] > 0) and np.all(rowDiff == rowDiff[0])) useOptimized = (colUniform and rowUniform) # If in bounds and uniform, use sampleRegion then stride the grid points if (useOptimized): # Get full region # Add 1 since sampleRegion uses exclusive upper bounds xMin, xMax = xArr[0], xArr[-1] + 1 yMin, yMax = yArr[0], yArr[-1] + 1 region = self.sampleRegion([xMin, xMax], [yMin, yMax]) # Apply striding colStride = colDiff[0] if (len(col) > 1) else 1 rowStride = rowDiff[0] if (len(row) > 1) else 1 if ((colStride > 1) or (rowStride > 1)): region = region[::rowStride, ::colStride] return region # Else out-of-bounds or irregular grid so fall back to use samplePoints else: xx, yy = np.meshgrid(xArr, yArr) depths = self.samplePoints(xx.flatten(), yy.flatten()) return depths.reshape(xx.shape)
#--------------------------------------------------------------------------
[docs] def sampleRegion(self, x:Union[List[float], np.ndarray], y:Union[List[float], np.ndarray], )->np.ndarray: """ Extract depth map region within specified x,y coordinate boundaries. Samples a rectangular region of the depth map defined by x-bounds and y-bounds. Efficiently combines tiles from cache. Parameters ---------- x : list of float or ndarray Boundary pairs [xmin,xmax]. Defines East-West (horizontal) extent of region. y : list of float or ndarray Boundary pairs [ymin,ymax]. Defines North-South (vertical) extent of region. Returns ------- z : ndarray 2D array of depth map region. Notes ----- - Upper bounds of region are excluded from returned array: x=[0, 100], y=[0, 100] returns columns 0-99 (100 columns) and rows 0-99 (100 rows). - Short-circuits for the 'flat' style by delegating to `_mapFlat()`. Examples -------- >>> floor = Floor() >>> x = [100, 200] >>> y = [150, 250] >>> z = floor.sampleRegion(x, y) # z is array (100, 100) """ # Unpack boundaries xmin, xmax = x ymin, ymax = y # Convert boundaries to indices colMin, rowMin = self.xy2Index(xmin, ymin) colMax, rowMax = self.xy2Index(xmax, ymax) # Ensure proper ordering due to allowing array index wrapping colMin, colMax = min(colMin, colMax), max(colMin, colMax) rowMin, rowMax = min(rowMin, rowMax), max(rowMin, rowMax) # Allocate output array width = colMax - colMin height = rowMax - rowMin region = np.empty((height, width), dtype=np.float64) # Check depth style if (self.style == 'flat'): return self._mapFlat(region) # Determine tiles that cover requested region tileIMin = colMin // self._tileSize tileIMax = (colMax - 1) // self._tileSize tileJMin = rowMin // self._tileSize tileJMax = (rowMax - 1) // self._tileSize # Assemble region from tile cache for tileI in range(tileIMin, tileIMax + 1): for tileJ in range(tileJMin, tileJMax + 1): # Get tile from cache or generate tile = self._getTile(tileI, tileJ) # Convert tile bounds to array space colMinTile = tileI * self._tileSize colMaxTile = min(colMinTile + self._tileSize, self.size) rowMinTile = tileJ * self._tileSize rowMaxTile = min(rowMinTile + self._tileSize, self.size) # Calculate overlap between requested region and tile overlapColMin = max(colMin, colMinTile) overlapColMax = min(colMax, colMaxTile) overlapRowMin = max(rowMin, rowMinTile) overlapRowMax = min(rowMax, rowMaxTile) # Skip if no overlap if ((overlapColMin > overlapColMax) or (overlapRowMin > overlapRowMax)): continue # Calculate indices in tile space tColMin = overlapColMin - colMinTile tColMax = overlapColMax - colMinTile tRowMin = overlapRowMin - rowMinTile tRowMax = overlapRowMax - rowMinTile # Calculate indices in region space rColMin = overlapColMin - colMin rColMax = overlapColMax - colMin rRowMin = overlapRowMin - rowMin rRowMax = overlapRowMax - rowMin # Copy data from tile to region region[rRowMin:rRowMax, rColMin:rColMax] = \ tile[tRowMin:tRowMax, tColMin:tColMax] return region
#--------------------------------------------------------------------------
[docs] def createMap(self, style:Optional[str]=None, **kwargs)->NPFltArr: """ Create depth map from normalized 2D noise array, scaled by style. Parameters ---------- style : str Terrain generation style. Valid options: - "flat" : Flat plane - "linear" : Linear depth mapping (default) - "sigmoid" : Smooth transitions - "shelf" : Plateaus and ridges **kwargs : dict, optional Keyword arguments passed through to the depth generating method. Available for all styles: - noise : ndarray - Pre-computed noise array [0, 1]. Otherwises uses default perlin instance if available. Generates a temporary array on-demand from `_perlinKwargs` if not provided and none found. - z : float - Override minimum depth (m). Defaults to self.z. - z_range : float - Override depth range (m). Defaults to self.z_range. Ignored by 'flat'. Style-specific: - k : float - For 'sigmoid': transition steepness (default 5.0). For 'shelf': asymmetry strength (default 3.0). Returns ------- depth : ndarray Depth map in meters with shape (size, size) Notes ----- **Flat Floor with Noise-Based Style:** If the Floor instance is in `style='flat'` mode (`perlin=None`) but a noise-based style is requested, a temporary `PerlinNoise` instance is created using the stored `_perlinKwargs` (seed, scale, octaves, persistence, random) and discarded after use. This does not modify `self.perlin` or change the operating mode of the Floor. If making multiple calls to createMap with noise-based styles while in `style='flat'` mode, a new noise array is generated each time. To avoid this, either provide a pregenerated noise array or swith to a terrain Floor style to allow createMap to use the default perlin instance. If a `noise` array is supplied via `**kwargs`, no `PerlinNoise` instance is created regardless of floor mode. """ # Get strategy key from style strategy = self.style if (style is None) else self._validateStyle(style) # Ensure noise array is available in edge cases if ((strategy != 'flat') and (self.perlin is None) and ('noise' not in kwargs)): pn = PerlinNoise(size=self.size, **self._perlinKwargs) return self._styleStrategies[strategy]( noise = pn.evaluateRegion([0, self.size], [0, self.size]), **kwargs, ) return self._styleStrategies[strategy](**kwargs)
#--------------------------------------------------------------------------
[docs] def xy2Index(self, x:Union[float, NPFltArr], y:Union[float, NPFltArr], )->Tuple[Union[int, NPIntArr], Union[int, NPIntArr]]: """ Transform coordinates from (x,y) to array indices. Maps END coordinates (East, North) to array indices following [row, column] convention. Parameters ---------- x : list of float or ndarray X-coordinates relative to origin for depth array. x = East, increases rightward y : list of float or ndarray Y-coordinates relative to origin for depth array. y = North, increases upward Returns ------- col, row : int or ndarray Array column and row indices, ready for depth[row, col] access. col = column index (0..size-1), increases East. row = row index (0..size-1), increases North. Type matches input. If scalar inputs: (int, int), if array inputs: (ndarray, ndarray) Notes ----- **Coordinate System Convention (END):** - x increases East (rightward in visual display) - y increases North (upward in visual display) - Array indexing: depth[row, col] where row == y, col == x **Origin Mapping:** Origin attribute of Floor instance defines where the (0,0) point of the END x,y coordinates maps in the floor depth array. >>> floor = Floor(origin=[250,900]) >>> col, row = floor.xy2Index(0,0) # col = 250, row = 900 **Boundary Behavior - Coordinate Wrapping:** Queries outside the floor domain [0, size] are handled via modulo wrapping: - Coordinates automatically wrap around array boundaries - Prevents IndexError exceptions that would halt simulation - Effectively treats floor map as infinitely repeating **Rationale:** Negative array indexing in Python is valid (accesses from end), making lower-bound violations difficult to detect compared to upper-bound IndexError exceptions. Rather than implement costly bounds checking or allowing edge indexing errors stop the entire simulations, wrapping provides graceful degradation. **Impact on Simulation:** Wrapped coordinates may return unrealistic depth values for vehicles that stray far from intended operation areas. Users should validate that vehicle trajectories remain within expected floor domain. """ col = (np.asarray(x) + self.origin[0]).astype(int) % self.size row = (np.asarray(y) + self.origin[1]).astype(int) % self.size return col, row
#--------------------------------------------------------------------------
[docs] def generate(self)->None: """ Explicitly precomputes full depth array from Perlin noise. Forces generation of all tiles and caches the complete depth array. Subsequent accesses to depth property or tile-based queries will use cached data without generation overhead. Notes ----- Resets tile cache and depth data, and always regenerates from Perlin noise regardless of any prior state. A manually assigned depth array will be discarded. Useful for: - Precomputing before batch simulations - Forcing Perlin regeneration after parameter changes - Eliminating on-demand generation overhead Examples -------- >>> floor = Floor(size=5000) >>> floor.generate() # Precompute before simulation loop >>> sim.run() # depth array already cached >>> floor.display3D() # instant visualization """ self._resetDepthCache() self._depth = self._buildFullDepth()
#--------------------------------------------------------------------------
[docs] def display2D(self, z:Optional[NPFltArr]=None, dispType="contour", path:Optional[Waypoint]=None, )->None: """ Display 2D image of depth array. Parameters ---------- z : ndarray, optional Depth array to display. If None, uses self.depth. dispType : str, default='contour' Display style. - 'contour': A contour plot with labeled depth contours - 'cloud': Simple scaling to the color map. path : guidance.Waypoint, optional A waypoint object. Path described by waypoints is plotted over the depth map. """ if (z is None): z = self.depth # z.shape[1] is 'x' (columns), z.shape[0] is 'y' (rows) extent=[-self.origin[0], z.shape[1]-self.origin[0], -self.origin[1], z.shape[0]-self.origin[1]] plt.figure(figsize=(9,6)) # Simple scaling plot if (dispType == "cloud"): p = plt.imshow(z, extent=extent, origin='lower', cmap='viridis_r') plt.colorbar(p).ax.invert_yaxis() # Plot with countour lines else: plt.imshow(z, extent=extent, origin='lower', cmap='viridis_r', alpha=0.5) plt.colorbar().ax.invert_yaxis() x = np.linspace(extent[0], extent[1] - 1, z.shape[1]) y = np.linspace(extent[2], extent[3] - 1, z.shape[0]) contours = plt.contour(x, y, z, 10, colors='black', alpha=0.4) plt.clabel(contours, inline=True, fontsize=8, fmt="%.0f") # Lines at origin oc = 'gray' # origin color ow = 0.5 # origin width plt.axvline(x=0, linewidth=ow, color=oc) plt.axhline(y=0, linewidth=ow, color=oc) # Path of vehicle if (path is not None): pmc = "red" # path marker color plc = "black" # path line color plt.plot(path.pos.x, path.pos.y, linestyle="dotted", color=plc) plt.scatter(path.pos.x, path.pos.y, marker='^', color=pmc, s=64) plt.xlabel('X / East (m)') plt.ylabel('Y / North (m)') plt.show()
#--------------------------------------------------------------------------
[docs] def display3D(self, z:Optional[NPFltArr]=None)->None: """ Display 3D plot of depth array. Parameters ---------- z : ndarray, optional Depth array to display. If None, uses self.depth. """ if (z is None): z = self.depth # Set color map color = mpl.colormaps['terrain'] new_cmap = mpl.colors.ListedColormap(color(np.linspace(0.65,0.55,256))) # color = mpl.colormaps['gist_earth'] # new_cmap = mpl.colors.ListedColormap(color(np.linspace(0.9,0.8,256))) # color = mpl.colormaps['copper'] # new_cmap = mpl.colors.ListedColormap(color(np.linspace(0.7,0.9,256))) # color = mpl.colormaps['Wistia'] # new_cmap = mpl.colors.ListedColormap(color(np.linspace(0.9,0.25,256))) # Plot floor # z.shape[1] is 'x' (columns), z.shape[0] is 'y' (rows) x_coords = np.arange(z.shape[1]) - self.origin[0] y_coords = np.arange(z.shape[0]) - self.origin[1] x, y = np.meshgrid(x_coords, y_coords) fig = plt.figure(figsize=(9,9)) ax = fig.add_subplot(111, projection='3d') ax.set_zlim(-z.max(),0) ax.plot_surface(x, y, -z, alpha=0.9, cmap=new_cmap) # Add water surface ax.plot_surface(x, y, 0*z, alpha=0.3, color='blue') ax.set_xlabel('X / East (m)') ax.set_ylabel('Y / North (m)') ax.set_zlabel('Z / Down (m)') plt.show()
## Helper Methods ========================================================#
[docs] def _validateStyle(self, style:str)->str: """ Check if style is listed in style strategies dictionary. Parameters ---------- style : str Terrain generation style. Valid options: - "flat" : Flat plane - "linear" : Linear depth mapping (default) - "sigmoid" : Smooth transitions - "shelf" : Plateaus and ridges Returns ------- style : str, default = "linear" Validated style matching available strategy. Falls back to default if input is not valid. """ default = "linear" if (style not in self._styleStrategies): available = ", ".join(self._styleStrategies.keys()) msg = (f"'{style}' is not a valid terrain style. " f"Available options: {available}") log.warning(msg) log.warning("Proceeding with default: '%s'.", default) style = default return style
#--------------------------------------------------------------------------
[docs] def _getFlatZ(self)->np.float64: """Return self.z as numpy scalar for flat floor depth queries""" return np.float64(self.z)
#--------------------------------------------------------------------------
[docs] def _mapFlat(self, noise:Optional[NPFltArr]=None, z:Optional[Number]=None, **kwargs )->NPFltArr: """ Flat depth mapping with no depth variation. Parameters ---------- noise : ndarray, optional 2D array whose shape is used for the output. If None, output shape is (size,size). z : float, optional Constant depth value in meters. If None, uses self.z. kwargs Accepted but ignored. Allows uniform call signature with other depth mapping methods (e.g., via `createMap`). Returns ------- depth : ndarray Constant-valued depth array in meters. """ # Use size from noise array if provided shape = noise.shape if (noise is not None) else (self.size, self.size) z_val = self.z if (z is None) else z return np.full(shape, z_val, dtype=np.float64)
#--------------------------------------------------------------------------
[docs] def _mapLinear(self, noise:Optional[NPFltArr]=None, z:Optional[Number]=None, z_range:Optional[Number]=None, **kwargs )->NPFltArr: """ Linear depth mapping from 2D noise array. Maps normalized Perlin noise [0, 1] to depth range [z, z + z_range] using linear scaling: depth = noise * z_range + z Parameters ---------- noise : ndarray, optional 2D array normalized to [0, 1]. If None, generates via perlin.evaluateRegion() z : float, optional Minimum depth in meters. If None, uses self.z. z_range : float, optional Depth variation range in meters. If None, uses self.z_range. Returns ------- depth : ndarray Depth map in meters with shape (size, size) """ # Generate noise on-demand if not provided if noise is None: noise = self.perlin.evaluateRegion([0, self.size], [0, self.size]) z_val = self.z if (z is None) else z z_r = self.z_range if (z_range is None) else z_range return (noise * z_r) + z_val
#--------------------------------------------------------------------------
[docs] def _mapSigmoid(self, noise:Optional[NPFltArr]=None, z:Optional[Number]=None, z_range:Optional[Number]=None, **kwargs )->NPFltArr: """ Sigmoid depth mapping with smooth transitions. Creates smooth gradual transitions between shallow and deep regions using sigmoid function. Formula: depth = z + z_range / (1 + exp(-k*(2*noise - 1))) where k controls transition steepness (default k=5) Parameters ---------- noise : ndarray, optional 2D noise array normalized to [0, 1]. If None, generates via perlin.evaluateRegion() z : float, optional Minimum depth in meters. If None, uses self.z. z_range : float, optional Depth variation range in meters. kwargs : dict, optional Optional keyword arguments: - k : float, steepness parameter (default 5.0) Returns ------- depth : ndarray Depth map with smooth sigmoid transitions """ # Generate noise on-demand if not provided if noise is None: noise = self.perlin.evaluateRegion([0, self.size], [0, self.size]) z_val = self.z if (z is None) else z zr = self.z_range if (z_range is None) else z_range k = kwargs.get('k', 5.0) return z_val + zr / (1.0 + np.exp(-k * (2.0 * noise - 1.0)))
#--------------------------------------------------------------------------
[docs] def _mapShelf(self, noise: Optional[NPFltArr] = None, z: Optional[Number] = None, z_range: Optional[Number] = None, **kwargs )->NPFltArr: """ Continental shelf mapping with asymmetric depth scaling. Creates terrain with pronounced raised features while keeping deep areas relatively flat and uniform. Transformation splits at midpoint (noise=0.5). Upper half scaled with cubic expansion, and lower half with compression. Formula: For n >= 0.5 (shallow): scaled = 0.5 + 2*(n - 0.5)^3 For n < 0.5 (deep): scaled = 2*n^3 This creates depth = z + z_range * scaled Parameters ---------- noise : ndarray, optional 2D array normalized to [0, 1]. If None, generates via perlin.evaluateRegion(). z : float, optional Minimum depth in meters. If None, uses self.z. z_range : float, optional Depth variation range in meters. If None, uses self.z_range. kwargs : dict, optional Optional keyword arguments: - k : float, asymmetry strength (default 3.0) Returns ------- depth : ndarray Depth map with shelf-like features """ # Generate noise on-demand if not provided if noise is None: noise = self.perlin.evaluateRegion([0, self.size], [0, self.size]) z_val = self.z if (z is None) else z zr = self.z_range if (z_range is None) else z_range k = kwargs.get('k', 3.0) # Split transformation at midpoint midpoint = 0.5 # Upper half: Expand (more variation) maskUpper = noise >= midpoint upper = midpoint + 2.0 * ((noise - midpoint) ** k) # Lower half: Compress (less variation) lower = 2.0 * (noise ** k) # Combine branches scaled = np.where(maskUpper, upper, lower) return z_val + zr * scaled
#--------------------------------------------------------------------------
[docs] def _buildFullDepth(self)->NPFltArr: """ Construct full depth array from tile cache and new tiles. Iterates through floor tiles, generating and caching depth tiles as needed. Combines into single numpy array. Returns ------- depth : ndarray Fully computed depth array. Notes ----- Short-circuits for the 'flat' style by delegating directly to `_mapFlat()`. """ depth = np.zeros((self.size, self.size), dtype=np.float64) # Check depth style if (self.style == 'flat'): return self._mapFlat(depth) nTilesI = (self.size + self._tileSize - 1) // self._tileSize nTilesJ = (self.size + self._tileSize - 1) // self._tileSize for tileI in range(nTilesI): for tileJ in range(nTilesJ): # Get or generate tile tile = self._getTile(tileI, tileJ) # Determine position in full array iStart = tileI * self._tileSize jStart = tileJ * self._tileSize iEnd = min(iStart + self._tileSize, self.size) jEnd = min(jStart + self._tileSize, self.size) tileHeight = jEnd - jStart tileWidth = iEnd - iStart # Write tile to full array depth[jStart:jEnd, iStart:iEnd] = tile[:tileHeight, :tileWidth] return depth
#--------------------------------------------------------------------------
[docs] def _getTile(self, tileI:int, tileJ:int)->NPFltArr: """ Get or generate a cached tile of the depth array. Parameters ---------- tileI, tileJ : int Tile grid column, row indices. Returns ------- tile : ndarray, shape (tileSize, tileSize) Depth values for this tile in meters. Notes ----- **Tile Grid:** The floor size is divided into a whole number of uniformly sized tiles, forming a square grid. For example, a floor size of 2000 m may use a tile size of 500 m, forming a grid of 4 x 4 tiles. The tile size is provided by the perlin instance. **Tile Caching:** Tiles are cached in the tiles dictionary, with the key formed from the tile grid indices: tiles[(tileI, tileJ)]. Requesting a tile that has not been generated will trigger the tile to be evaluated and cached for future re-use. Otherwise, subsequent access returns the cached copy immediately. The tile cache is cleared when size, style, or the perlin instance are changed. See Also -------- PerlinNoise._calcTileSize : Determines tile size from array size PerlinNoise.evaluateRegion : Computes perlin noise over region createMap : Applys depth mapping to convert noise to depth array """ # Verify tile grid indices if ((max(tileI, tileJ) >= self.size // self._tileSize) or (min(tileI, tileJ) < 0)): log.warning( "Bad tile index: (%s, %s). Returning tile of z=%s values.", tileI, tileJ, self.z ) return np.full((self._tileSize, self._tileSize), self.z, dtype=np.float64) # Check tile cache key = (tileI, tileJ) if (key not in self._tiles): # Determine tile bounds iStart = tileI * self._tileSize jStart = tileJ * self._tileSize iEnd = min(iStart + self._tileSize, self.size) jEnd = min(jStart + self._tileSize, self.size) if (self._depth is not None): # Slice tile from existing depth array tile = self._depth[jStart:jEnd, iStart:iEnd].copy() else: # Evaluate noise region noise = self.perlin.evaluateRegion( [iStart, iEnd], [jStart, jEnd] ) # Convert noise to depth tile = self.createMap(noise=noise) # Cache self._tiles[key] = tile return self._tiles[key]
#--------------------------------------------------------------------------
[docs] def _resetDepthCache(self)->None: """ Clear the tile cache and full depth array. Resets both the tile dictionary and cached full depth array to their uninitialized states, forcing re-generation on next access. Notes ----- Should be called whenever any parameter that affects depth values changes — such as `size`, `style`, or the `perlin` instance — to ensure stale cached data is not returned. """ self._tiles = {} self._depth = None
###############################################################################
[docs]class PerlinNoise: """ 2D Perlin noise generator for procedural terrain and environmental features. Implements Ken Perlin's gradient noise algorithm to generate smooth, continuous pseudo-random patterns suitable for natural-looking ocean floor terrain maps and other spatial environmental features. Supports multi-octave generation for fractal detail, reproducible seeding for consistent terrains, and memory-efficient tile-based lazy evaluation. Parameters ---------- size : int Side length of square noise array to generate (array dimensions: size x size). scale : int, default=300 Spatial frequency scale in array units. Determines feature size: - Larger scale: Broader, smoother features (low frequency) - Smaller scale: Finer, more detailed features (high frequency) - Feature wavelength ≈ scale/size when octaves=1 Typical values: 100-500 for natural-looking ocean floors. octaves : int, default=3 Number of noise layers to combine (fractal octaves). Each octave adds detail at progressively higher frequencies: - octaves=1: Single frequency (smooth, uniform) - octaves=3-5: Natural terrain detail (recommended) - octaves>7: Extremely detailed, may appear noisy persistence : float, default=0.5 Amplitude decay factor for successive octaves. Controls roughness: - persistence=0.5: Balanced detail (recommended) - persistence<0.5: Smoother, less detailed terrain - persistence>0.5: Rougher, more chaotic terrain Each octave i has amplitude = persistence^i. seed : int, default=0 PRNG seed for permutation table generation. Ensures reproducibility: - Same seed -> identical noise pattern - Different seed -> completely different pattern - seed=0 is valid and deterministic random : bool, default=False If True, generates and uses random seed from entropy pool. If False, uses provided seed parameter. Attributes ---------- noise : ndarray, shape (size, size) Generated 2D Perlin noise array, normalized to [0, 1]. Ready for transformation into depth maps or other applications. p : ndarray, shape (512,), dtype=int Permutation table containing shuffled integers [0, 255] repeated twice. Defines gradient vector selection pattern. Derived from seed. rng : numpy.random.Generator Random number generator instance for permutation table creation. size : int Array dimension (stored for reference and reconstruction). scale : int Frequency scale parameter (stored for reconstruction). octaves : int Number of noise layers combined (stored for reconstruction). persistence : float Amplitude decay factor (stored for reconstruction). seed : int PRNG seed used for this instance (stored for reconstruction). Methods ------- __call__(i, j) Sample Perlin noise at position (i, j). Alias for sample() method. Makes PerlinNoise instance directly callable. sample(i, j) Sample Perlin noise at position (i, j) and cache result. Lazy evaluation with tile-based caching for memory efficiency. sampleRegion(i, j) Sample Perlin noise in region (iMin:iMax, jMin:jMax) and cache result. Generates and caches tiles containing the requested region. evaluate(i, j) Evaluate Perlin noise at position (i, j) without caching. Direct computation suitable for one-time queries. evaluateRegion(i, j) Evaluate Perlin noise in region (iMin:iMax, jMin:jMax) without caching. Computes noise over specified bounds without tile storage. generate() Generate and cache Perlin noise for entire array. Pre-computes all tiles to eliminate on-demand generation overhead. Notes ----- **Features:** - **Coherent Noise:** Smooth, continuous patterns without discontinuities - **Multi-Octave Detail:** Layered frequencies for fractal-like complexity - **Lazy Evaluation:** Tile-based on-demand generation for large arrays - **Reproducible:** Seed-based determinism for consistent terrains - **Normalized Output:** Values guaranteed in [0, 1] range **When to Use:** - Ocean floor depth map generation - Procedural environmental feature generation - Spatial noise patterns for sensor simulation - Any application requiring coherent 2D pseudo-random fields **Lazy Tile-Based Generation:** Small arrays are fully computed during initialization. Large arrays use on-demand tile generation to avoid upfront memory costs: - Tiles are generated when first accessed by sample methods - Cached for instant subsequent access - Full array precomputation by generate() if needed - Tile size is automatically optimized for array dimensions **2D Only:** Current implementation is focused on ocean floor applications: - No 3D/4D support (possible future extension) - Square arrays only **Perlin Noise Algorithm:** - **Gradient Noise Principles:** Perlin noise generates coherent pseudo-random values by: 1. Dividing space into grid of unit cubes 2. Assigning random gradient vectors to grid corners by permutation table 3. Computing dot products between gradients and position offsets 4. Interpolating dot products using smooth fade function - **Multi-Octave Generation:** Fractal detail is achieved by summing multiple octaves: noise(x, y) = sum (amplitude_i * perlin_i(x,y)) where: - sum is from i=0 to octaves-1 - amplitude_i = persistence^i - perlin_i: Base Perlin noise at frequency_i - frequency_i = (size / scale) * 2^i Lower octaves contribute large-scale features, higher octaves add fine detail. - **Interpolation (Fade Function):** Uses improved Perlin smoothing curve for continuity: fade(t) = 6t^5 - 15t^4 + 10t^3 Properties: - f(0) = 0, f(1) = 1 (boundary conditions) - f'(0) = f'(1) = 0 (smooth derivatives) - f''(0) = f''(1) = 0 (smooth curvature) - **Normalization:** Output is linearly scaled to [0, 1]: normalized = (noise - min(noise)) / (max(noise) - min(noise)) **Gradient Vector Selection:** Permutation table p maps grid coordinates to gradient indices: - p contains [0-255] shuffled, repeated twice (length 512) - Hash function: h = p[p[xi] + yi] determines gradient - Four possible gradients: [(0,1), (0,-1), (1,0), (-1,0)] **Alternative Implementations Considered:** Alternative methods for Perlin noise creation exist, but in testing were slower or more complicated than what is needed here. For example, - opensimplex: can do 3D, - pythonperlin: can do 3D, offers seamless tiling - noise: does not work with numpy arrays - nlmpy: did not explore, looks extensive Current implementation balances simplicity, speed, and integration. Warnings -------- - Does not support non-square arrays - Changing scale/octaves/persistence with same seed creates different patterns See Also -------- Floor : Uses PerlinNoise to generate ocean floor depth maps Floor.createMap : Transforms normalized noise into depth values References ---------- [1] Producing 2D Perlin noise with numpy https://stackoverflow.com/questions/42147776/producing-2d-perlin-noise-with-numpy [2] Perlin Noise (with implementation in Python) https://iq.opengenus.org/perlin-noise/ [3] Adrian's Blog: Perlin Noise Explanation https://adrianb.io/2014/08/09/perlinnoise.html [4] Red Blob Games: Terrain Generation from Noise https://www.redblobgames.com/maps/terrain-from-noise/ Examples -------- ### Basic noise generation (small array): >>> small_pn = PerlinNoise(size=500, scale=300, octaves=3, seed=42) >>> noise_array = small_pn.noise # Instant (already generated) >>> print(f"Array shape: {noise_array.shape}") Array shape: (500, 500) >>> print(f"Value range: [{noise_array.min():.3f},{noise_array.max():.3f}]") Value range: [0.000, 1.000] ### Accessing noise (large array): >>> large_pn = PerlinNoise(size=5000, seed=42) >>> large_pn.generate() # Builds from tiles (may take time) >>> noise_array = large_pn.noise >>> # Or sample specific points without full generation: >>> value = large_pn.sample(2500, 2500) # Only generates needed tile ### Effect of persistence: >>> smooth = PerlinNoise(size=500, persistence=0.3, seed=50) >>> rough = PerlinNoise(size=500, persistence=0.7, seed=50) >>> # smooth.noise has gentler variations >>> # rough.noise has sharper, more chaotic features ### Random unique terrains: >>> for i in range(5): ... unique = PerlinNoise(size=500, random=True) ... print(f"Terrain {i+1} seed: {unique.seed}") Terrain 1 seed: 187239847293847 Terrain 2 seed: 928374619283746 # ... each has unique seed and pattern ### Custom frequency and detail: >>> # Fine detail with high frequency >>> fine = PerlinNoise(size=1000, scale=100, octaves=6) >>> >>> # Broad features with low frequency >>> broad = PerlinNoise(size=1000, scale=600, octaves=2) """ ## Class Attributes ======================================================# _GRADIENTS = np.array([[0, 1], [0, -1], [1, 0], [-1, 0]], dtype=float) _TILE_SIZE_STD = 500 # Target tile size for large arrays _TILE_SIZE_PRE = 1000 # Max array size for full precomputation _TILE_SENTINEL = 0.5 # Neutral fill value for ungenerated tiles ## Constructor ===========================================================#
[docs] def __init__(self, size:int, scale:int=300, octaves:int=3, persistence:float=0.5, seed:int=0, random:bool=False): """ Initialize Perlin noise generator with specified parameters. Creates permutation table from seed and prepares tile-based structure for lazy noise generation. For small arrays, generates full noise during initialization. For larger arrays, sets up tile cache for on-demand generation. Parameters ---------- size : int Side length of square output array. Creates size x size noise map. scale : int, default=300 Base frequency scale factor. Determines the size of the largest features: - Larger scale (500-1000): Broad, rolling terrain - Medium scale (200-400): Balanced terrain (recommended) - Smaller scale (50-150): Fine, detailed features Formula: base_frequency = size / scale Recommended: scale = 0.2-0.5 x size for natural appearance. octaves : int, default=3 Number of noise frequencies to layer. Each adds detail at 2x frequency: - octaves=1: Single smooth layer (uniform hills) - octaves=2-4: Natural terrain detail (recommended) - octaves=5-7: High detail (rocky, complex) - octaves>7: Excessive detail, diminishing returns Computation cost scales linearly with octaves. persistence : float, default=0.5 Amplitude scaling between octaves. Controls fractal dimension: - persistence=0.3-0.4: Smooth, gentle terrain - persistence=0.5: Balanced (recommended) - persistence=0.6-0.7: Rough, dramatic terrain - persistence>0.7: Very chaotic, may look noisy Formula: amplitude_octave_i = persistence^i Valid range: [0, 1], typical range: [0.3, 0.7]. seed : int, default=0 Random number generator seed for permutation table: - Identical seeds produce identical noise patterns - Different seeds produce completely different patterns - seed=0 is valid (not treated specially) - Can be any integer (negative values accepted) Used to initialize numpy.random.default_rng(seed). random : bool, default=False Override seed parameter with system entropy: - True: Generates seed for unique pattern each run - False: Uses provided seed for reproducibility When True, final seed value stored in self.seed attribute. Attributes ---------- size : int Stored for reconstruction and reference. scale : int Stored for reconstruction and reference. octaves : int Stored for reconstruction and reference. persistence : float Stored for reconstruction and reference. seed : int Actual seed used (either provided or generated if random=True). rng : numpy.random.Generator RNG instance created from seed. p : ndarray, shape (512,), dtype=int Permutation table: shuffled [0-255] repeated twice. Used to hash grid coordinates to gradient vectors. noise : ndarray, shape (size, size), dtype=float64 Final normalized Perlin noise array in range [0, 1]. Notes ----- **Tile-Based Generation:** The noise array uses lazy tile-based generation for memory efficiency: - Size < _TILE_SIZE_PRE: Fully generated as a single tile during initialization - Size >= _TILE_SIZE_PRE: Divided into tiles, each generated on-demand - Tile size automatically calculated as divisor of size closest to _TILE_SIZE_STD - Un-generated tile regions of the noise array are prefilled with a neutral sentinel value (_TILE_SENTINEL) - On first access, tiles are generated and cached for reuse During initialization, only the tile management structure is set up. Full array generation occurs lazily through sample methods or explicitly with the generate() method. **Initialization Process:** 1. **Seed Handling:** - If random=True: Generate from system entropy - Store in self.seed, create RNG instance 2. **Permutation Table:** - Create [0-255] array, shuffle, repeat to length 512 - Stored in self.p for gradient selection 3. **Normalization Bounds:** - Generates 1000x1000 test array to measure Perlin range - Computes empirical [min, max] for deterministic normalization - Ensures consistent [0, 1] bounded output across all tiles 4. **Tile Management Setup:** - Arrays < 1000: Single tile (full precomputation) - Arrays >= 1000: Multiple tiles (lazy generation) - Tile size = closest divisor to 500 that evenly divides size - Noise array prefilled with sentinel value (0.5) 5. **Lazy Evaluation Ready:** - No actual noise values computed during __init__ - Generation triggered by: sample(), sampleRegion(), or generate() - Tiles cached on generation for reuse **Tile Size Selection:** Automatically balances tile count vs tile size: - Target tile size: ~500x500 for good response time - Constraint: Must evenly divide array size - Example: 2000x2000 array produces a 4x4 grid of 500x500 tiles Examples -------- ### Minimal initialization: >>> pn = PerlinNoise(size=1000) >>> # Uses defaults: scale=300, octaves=3, persistence=0.5, seed=0 ### Custom parameters: >>> pn = PerlinNoise( ... size=2000, ... scale=400, # Larger features ... octaves=5, # More detail ... persistence=0.6, # Rougher ... seed=12345 ... ) ### Random terrain each run: >>> pn = PerlinNoise(size=1000, random=True) >>> print(f"Generated with seed: {pn.seed}") Generated with seed: 161803398874989484820458683436563811772 ### Reproducible terrain: >>> pn1 = PerlinNoise(size=500, seed=99) >>> pn2 = PerlinNoise(size=500, seed=99) >>> np.array_equal(pn1.noise, pn2.noise) True """ # Store parameters self.size = size self.scale = scale self.octaves = octaves self.persistence = persistence self.seed=seed if (random): self.seed = np.random.SeedSequence().entropy # Initialize noise generation self.rng = np.random.default_rng(seed=self.seed) self.p = self._calcPermutationTable() self.noise = np.full((size, size), self._TILE_SENTINEL, dtype=np.float64) self._min, self._diff = self._calcNormalizationBounds() # Tile management self._tiles = {} self._tileSize = self._calcTileSize() self._tileN = size // self._tileSize # Precompute small array if (size < self._TILE_SIZE_PRE): self._genTile(0, 0)
## Special Methods =======================================================#
[docs] def __call__(self, i:int, j:int)->np.float64: """ Sample Perlin noise at position (i, j). Alias for sample() method. Makes PerlinNoise instance directly callable for convenient point sampling. Delegates to sample() method which provides tile-based caching for efficient queries. Evaluates and caches tile containing (i, j) on first access. Parameters ---------- i : int Column index to sample (x direction, horizontal). Ranges [0, size). Wraps by modulo of size if out of bounds. j : int Row index to sample (y direction, vertical). Ranges [0, size). Wraps by modulo of size if out of bounds. Returns ------- noise : float Perlin noise value at noise[j, i] in range [0, 1]. Notes ----- **Callable Convenience:** Enables both function call syntax and method alias: >>> pn = PerlinNoise(size=100, seed=42) >>> val1 = pn(50, 50) # Query by __call__() >>> val2 = pn.sample(50, 50) # Query by sample() >>> val1 == val2 True **Tile Caching:** First query within a tile triggers generation and caching. Subsequent queries in the same tile return instantly from cache. **Index Convention:** Following numpy convention: - i: Column index (horizontal, x direction) - j: Row index (vertical, y direction) - Access pattern: noise[j, i] (row-major ordering) See Also -------- sample : Implementation with detailed caching explanation evaluate : Direct evaluation without caching Examples -------- >>> pn = PerlinNoise(size=1000, seed=42) >>> # Simple point query >>> value = pn(500, 500) >>> print(f"Noise at (500, 500): {value:.3f}") Noise at (500, 500): 0.485 >>> # Loop over multiple points >>> values = [pn(i, i) for i in range(0, 100, 10)] >>> print(f"Diagonal values: {values}") """ return self.sample(i, j)
#--------------------------------------------------------------------------
[docs] def __repr__(self)->str: """Detailed description of PerlinNoise.""" # Note that rng, p, and noise can be recreated from these parameters return ( f"{self.__class__.__name__}(" f"size={self.size}, " f"scale={self.scale}, " f"octaves={self.octaves}, " f"persistence={self.persistence}, " f"seed={self.seed})" )
#--------------------------------------------------------------------------
[docs] def __str__(self)->str: """User friendly description of PerlinNoise.""" cw = 16 return ( f"Perlin\n" f"{' Scale:':{cw}} {self.scale}\n" f"{' Octaves:':{cw}} {self.octaves}\n" f"{' Persistence:':{cw}} {self.persistence}\n" f"{' Seed:':{cw}} {self.seed}\n" )
## Methods ===============================================================#
[docs] def evaluate(self, i:int, j:int)->np.float64: """ Evaluate Perlin noise at position (i, j) without caching. Computes noise value by direct evaluation without tile caching. Internally calls _evaluateTile() for 1x1 area. Suitable for one-time queries where caching overhead is undesirable. Parameters ---------- i : int Column index (x direction, horizontal). j : int Row index (y direction, vertical). Returns ------- noise : float Perlin noise value at noise[j, i] in range [0, 1]. Notes ----- **Direct Evaluation:** No caching performed, the value is computed fresh each call. More memory-efficient than sample() for sparse, non-repetitive queries. **Performance Comparison:** For repeated queries to same region, sample() is faster due to tile-based caching. For widely scattered queries, evaluate() may be faster and more memory-efficient. **Index Convention:** - i: Column index (horizontal, x direction) - j: Row index (vertical, y direction) **When to Use:** Prefer evaluate() over sample() when: - Single-use queries (no caching overhead) - Memory-constrained (no tile storage) - Deterministic evaluation order needed Prefer sample() when: - Querying same region multiple times (caching benefit) - Sparse sampling patterns (lazy generation benefit) - Memory constraints allow caching (tiles persist) See Also -------- evaluateRegion : Region evaluation without caching sample : Single-point evaluation with tile caching Examples -------- >>> pn = PerlinNoise(size=1000, seed=42) >>> value = pn.evaluate(500, 500) >>> print(f"Noise at (500, 500): {value:.3f}") Noise at (500, 500): 0.278 """ return self._evaluateTile([i, i+1], [j, j+1])[0, 0]
#--------------------------------------------------------------------------
[docs] def evaluateRegion(self, i:Union[List[int], NPIntArr], j:Union[List[int], NPIntArr], )->NPFltArr: """ Evaluate Perlin noise in region (iMin:iMax, jMin:jMax) without caching. Computes multi-octave Perlin noise over the specified region using direct evaluation with automatic chunking for large regions. Does not cache intermediate results or tiles. Provides memory-efficient region evaluation when caching is unnecessary of undesirable. Parameters ---------- i : list of int or ndarray Column index bounds [iMin, iMax]. Defines horizontal extent of region. iMax is exclusive: [0, 100] returns columns 0-99 (100 columns total). j : list of int or ndarray Row index bounds [jMin, jMax]. Defines vertical extent of region. jMax is exclusive: [0, 100] returns rows 0-99 (100 rows total). Returns ------- noise : ndarray, shape (jMax-jMin, iMax-iMin) 2D array of Perlin noise values in range [0, 1]. Note: Array indexing is [row, column] = [j, i]. Notes ----- **Direct Evaluation Algorithm:** Method performs full Perlin noise generation without caching: 1. Extract region bounds: iMin, iMax, jMin, jMax 2. Calculate region dimensions: width = iMax - iMin, height = jMax - jMin 3. Chunking check: a. If max(width, height) < _TILE_SIZE_PRE: - Process entire region as single call b. If max(width, height) >= _TILE_SIZE_PRE: - Calculate tile size as divisor closest to _TILE_SIZE_STD - Divide region into grid of tiles - Process each tile sequentially - Assemble results into output array 4. Per-Tile Processing: a. Compute octave frequency: currFreq = (size/scale) * 2^octave b. Compute octave amplitude: currAmp = persistence^octave c. Map tile bounds to frequency space coordinates d. Create coordinate meshgrids with np.linspace() e. Evaluate Perlin noise at all grid points f. Scale by amplitude and accumulate: noise += currNoise * currAmp 5. Normalize accumulated noise to [0, 1] 6. Return normalized region array **No Caching:** Unlike sample() and sampleRegion(), this method: - Does not check tile generation status - Does not write results to self.noise array - Does not mark tiles as generated in _tiles dictionary - Recomputes values on every call (no memoization) **Tile Processing:** For large regions (max dimension >= _TILE_SIZE_PRE), the method automatically divides the input into tiles and processes each tile sequentially to prevent memory issues. Results are assembled seamlessly into final output array. **Index Convention:** Array uses standard numpy row-major ordering: - i: Column index (horizontal, "x" direction) - j: Row index (vertical, "y" direction) - Returned array: noise[j_height, i_width] **Frequency Space Mapping:** Region bounds transformed to Perlin frequency coordinates: >>> iStart = (iMin / size) * currFreq >>> iEnd = (iMax / size) * currFreq >>> jStart = (jMin / size) * currFreq >>> jEnd = (jMax / size) * currFreq Creates meshgrid spanning [start, end) with width/height points. **Octave Accumulation:** Each octave contributes at increasing frequency, decreasing amplitude: - Octave 0: freq = size/scale, amp = 1.0 - Octave 1: freq = 2*size/scale, amp = persistence - Octave 2: freq = 4*size/scale, amp = persistence^2 - Octave i: freq = 2^i*size/scale, amp = persistence^i **When to Use:** Prefer evaluateRegion() over sampleRegion() when: - Single-use query (no repeated access to same region) - Memory-constrained (cannot afford tile storage) - Small regions (caching overhead not justified) - Avoiding cache pollution for rarely-accessed areas Prefer sampleRegion() when: - Repeated queries to same/overlapping regions - Sufficient memory for tile caching - Building up coverage of domain over time - Pre-generating full noise array by generate() See Also -------- evaluate : Single-point evaluation without caching sampleRegion : Region evaluation with tile caching sample : Single-point evaluation with tile caching _perlin : Core Perlin noise function _normalize : Normalization to [0, 1] _evaluateTile : Core tile evaluation implementation _calcTileSize : Tile size determination algorithm Examples -------- ### Evaluate rectangular region: >>> pn = PerlinNoise(size=1000, seed=42) >>> region = pn.evaluateRegion([100, 200], [150, 250]) >>> print(region.shape) (100, 100) # [jMax-jMin, iMax-iMin] >>> print(f"Range: [{region.min():.3f}, {region.max():.3f}]") Range: [0.249, 0.516] ### One-time subregion extraction: >>> pn = PerlinNoise(size=1000, seed=42) >>> # Extract small region for analysis >>> corner = pn.evaluateRegion([0, 50], [0, 50]) >>> center = pn.evaluateRegion([475, 525], [475, 525]) >>> # No persistent cache, suitable for sparse one-off queries """ # Bounds iMin, iMax = i jMin, jMax = j width = iMax - iMin height = jMax - jMin # Check if region is small enough to process in one tile maxDim = max(width, height) if (maxDim < self._TILE_SIZE_PRE): return self._evaluateTile(iMin, iMax, jMin, jMax) # Large region: divide into tiles tileSize = self._calcTileSize(maxDim) noise = np.zeros((height, width), dtype=np.float64) jTile = jMin while (jTile < jMax): jTileEnd = min(jTile + tileSize, jMax) jOffset = jTile - jMin jTileHeight = jTileEnd - jTile iTile = iMin while (iTile < iMax): iTileEnd = min(iTile + tileSize, iMax) iOffset = iTile - iMin iTileWidth = iTileEnd - iTile # Evaluate this tile tileNoise = self._evaluateTile( iTile, iTileEnd, jTile, jTileEnd ) # Place tile in output array noise[jOffset:jOffset+jTileHeight, iOffset:iOffset+iTileWidth] = tileNoise iTile = iTileEnd jTile = jTileEnd return noise
#--------------------------------------------------------------------------
[docs] def sample(self, i:int, j:int)->np.float64: """ Sample Perlin noise at position (i, j) and cache result. Returns noise value at specified position. Checks if the region of the noise array containing the position has already been generated. If not, the tile containing the point is computed and cached before returning the value. Provides efficient single-point evaluation with automatic tile management for lazy generation. Parameters ---------- i : int Column index (x direction, horizontal). Valid range: [0, size). j : int Row index (y direction, vertical). Valid range: [0, size). Returns ------- noise : float Perlin noise value at noise[j, i] in range [0, 1]. Notes ----- **Tile-Based Caching:** Method uses lazy evaluation strategy with tile-based caching: 1. Check if position's tile has been generated with _isGenerated() 2. If not cached: - Calculate tile indices: tileI = i // tileSize, tileJ = j // tileSize - Generate and cache tile with _genTile(tileI, tileJ) - Mark tile as generated in _tiles dictionary 3. Return value from cached array: noise[j, i] **Index Convention:** Array uses standard numpy row-major ordering: - i: Column index (horizontal, "x" direction) - j: Row index (vertical, "y" direction) - Array access: noise[j, i] = noise[row, column] **Tile Size:** Tile dimensions determined automatically during initialization: - size < _TILE_SIZE_PRE: Single tile (entire array) - size >= _TILE_SIZE_PRE: Multiple tiles of size-divisor nearest to _TILE_SIZE_STD **When to Use:** Prefer sample() over evaluate() when: - Querying same region multiple times (caching benefit) - Sparse sampling patterns (lazy generation benefit) - Memory constraints allow caching (tiles persist) Prefer evaluate() when: - Single-use queries (no caching overhead) - Memory-constrained (no tile storage) - Deterministic evaluation order needed See Also -------- sampleRegion : Region evaluation with tile caching evaluate : Single-point evaluation without caching evaluateRegion : Direct region evaluation without caching _genTile : Tile generation implementation _isGenerated : Tile status check Examples -------- ### Sample single point: >>> pn = PerlinNoise(size=1000, seed=42) >>> value = pn.sample(500, 500) >>> print(f"Noise at (500, 500): {value:.3f}") Noise at (500, 500): 0.278 ### Efficient repeated sampling: >>> pn = PerlinNoise(size=2000, seed=42) >>> # First access generates tile containing (1000, 1000) >>> val1 = pn.sample(1000, 1000) >>> # Second access reuses cached tile (instant) >>> val2 = pn.sample(1010, 1010) # Same tile >>> # Both queries benefit from single tile generation ### Sparse sampling pattern: >>> pn = PerlinNoise(size=5000, seed=99) >>> # Sample widely separated points >>> points = [(500, 500), (1500, 2000), (3000, 4000)] >>> samples = [pn.sample(i, j) for i, j in points] >>> # Only generates 3 tiles instead of full 5000 x 5000 array """ if (not self._isGenerated(i, j)): tileI = i // self._tileSize tileJ = j // self._tileSize self._genTile(tileI, tileJ) return self.noise[j, i] # [row, column]
#--------------------------------------------------------------------------
[docs] def sampleRegion(self, i:Union[List[int], NPIntArr], j:Union[List[int], NPIntArr], )->NPFltArr: """ Sample Perlin noise in region (iMin:iMax, jMin:jMax) and cache result. Evalutes Perlin noise across the specified region and caches any tiles that contain the region. Returns the requested slice from the cached noise array. Provides efficient access to subregions of noise array without redundant computation. Parameters ---------- i : list of int or ndarray Column index bounds [iMin, iMax]. Defines horizontal extent of region. iMax is excluded: [0, 100] returns columns 0 to 99 (100 total columns). j : list of int or ndarray Row index bounds [jMin, jMax]. Defines vertical extent of region. jMax is excluded: [0, 100] returns rows 0 to 99 (100 total rows). Returns ------- noise : ndarray, shape (jMax-jMin, iMax-iMin) 2D array of Perlin noise values in range [0, 1]. Note: Array indexing is [row, column] = [j, i]. Notes ----- **Tile-Based Caching:** Method determines which tiles overlap the requested region and generates any missing tiles before returning the result: 1. Compute tile grid indices spanning region: - tileIMin = iMin // tileSize - tileIMax = (iMax - 1) // tileSize - tileJMin = jMin // tileSize - tileJMax = (jMax - 1) // tileSize 2. Generate missing tiles with _genTile() for each (tileI, tileJ) 3. Return region slice: noise[jMin:jMax, iMin:iMax] **Tile Boundary Handling:** Regions spanning multiple tiles are handled seamlessly: - Each overlapping tile computed independently - Tiles cached separately in _tiles dictionary - Final slice extracted from unified noise array - No duplication of work for overlapping regions **Index Convention:** Array uses standard numpy row-major ordering: - i: Column index (horizontal, x direction) - j: Row index (vertical, y direction) - Returned array: [j_height, i_width] **Performance Characteristics:** - First call: Generates and caches required tiles - Subsequent calls: Returns cached values (instant) - Tile generation scales with region area - Overhead: Dictionary lookups for tile status checks See Also -------- sample : Single-point evaluation with tile caching evaluateRegion : Direct evaluation without caching _genTile : Tile generation implementation generate : Evaluate and cache all tiles Examples -------- ### Extract subregion from larger array: >>> pn = PerlinNoise(size=2000, seed=99) >>> # Get central 500x500 region >>> center = pn.sampleRegion([750, 1250], [750, 1250]) >>> print(center.shape) (500, 500) ### Efficient repeated access: >>> pn = PerlinNoise(size=1000, seed=42) >>> # First call generates tiles >>> region1 = pn.sampleRegion([0, 300], [0, 300]) >>> # Second call reuses cached tiles (fast) >>> region2 = pn.sampleRegion([100, 400], [100, 400]) >>> # Overlapping region uses cached data """ iMin, iMax = i jMin, jMax = j # Determine which tiles overlap this region tileIMin = iMin // self._tileSize tileIMax = (iMax - 1) // self._tileSize tileJMin = jMin // self._tileSize tileJMax = (jMax - 1) // self._tileSize # Generate any missing tiles for tileI in range(tileIMin, tileIMax + 1): for tileJ in range(tileJMin, tileJMax + 1): if not self._tiles.get((tileI, tileJ), False): self._genTile(tileI, tileJ) # Return requested region from cached array return self.noise[jMin:jMax, iMin:iMax]
#--------------------------------------------------------------------------
[docs] def generate(self)->None: """ Generate and cache Perlin noise for entire array. Evaluates and caches noise values across the complete size x size domain. After calling this method, the noise array contains valid Perlin values at all positions (no sentinel values remain). Notes ----- For arrays with size < _TILE_SIZE_PRE, generation occurs automatically during __init__. This method is for larger arrays using lazy, on-demand tile generation. Useful for pre-computing the full noise array before use to avoid on-demand generation overhead during runtime, or for plotting and display of the noise array. See Also -------- sample : On-demand single value evaluation with tile caching evaluateRegion : Direct evaluation of 2d area without caching """ for tileI in range(self._tileN): for tileJ in range(self._tileN): if not self._tiles.get((tileI, tileJ), False): self._genTile(tileI, tileJ)
## Helper Methods ========================================================#
[docs] def _calcPermutationTable(self)->NPIntArr: """ Generate permutation table for Perlin noise gradient vector selection. Creates a hash table by shuffling integers [0-255] and repeating twice to create a 512-element lookup array. This table maps grid coordinates to pseudo-random gradient vectors, providing the spatial coherence characteristic of Perlin noise. Returns ------- p : ndarray, shape (512,), dtype=int Permutation table containing [0-255] shuffled and repeated. Used to hash grid coordinates: p[p[xi] + yi] -> gradient index. Notes ----- **Algorithm:** 1. Create sequential array [0, 1, 2, ..., 255] 2. Shuffle using self._rng (seeded RNG for reproducibility) 3. Repeat shuffled array: [shuffled, shuffled] -> length 512 **Purpose:** The permutation table provides deterministic pseudo-random access to gradient vectors. Doubling to length 512 eliminates need for modulo operations when indexing: >>> hash_value = self.p[self.p[xi] + yi] # No % 256 needed **Gradient Mapping:** Hash values from permutation table select gradient vectors: - h = p[p[xi] + yi] determines which gradient from 4 options: - h % 4 = 0: [0, 1] (North) - h % 4 = 1: [0, -1] (South) - h % 4 = 2: [1, 0] (East) - h % 4 = 3: [-1, 0] (West) See Also -------- _perlin : Uses permutation table for gradient selection _gradient : Converts hash values to gradient vectors """ p = np.arange(256, dtype=int) self.rng.shuffle(p) return np.concatenate([p, p])
#--------------------------------------------------------------------------
[docs] def _calcNormalizationBounds(self)->float: """ Calculate empirical bounds of Perlin noise for on-demand normalization. Generates a noise array of 1000x1000 with instance input parameters and measures the actual min and max range to set normalization bounds. This empirical approach accounts for variations in unique parameter combinations combined with the permutation table randomization. Provides consistent, deterministic normalization for on-demand noise generation. Returns ------- min : float Empirical minimum observed in raw Perlin noise test array. diff : float Empirical range (max - min) observed in raw Perlin noise test array. Notes ----- **Purpose:** Perlin noise does not produce values exactly in the theoretical range [-amplitude, amplitude]. Empirical bounds are narrower due to: - Limited gradient vectors - Dot product geometry reducing magnitude - Interpolation smoothing dampening extremes Pre-computing empirical bounds allows consistent normalization: .. code-block:: python normalized = (noise - min_val) / diff # Always maps to [0, 1] regardless of tile **Why This Works:** The empirical bounds are a constant property of the algorithm, not the data. For given (octaves, persistence), the bounds are identical across: - Different seeds (only affects pattern, not range) - Different array sizes (frequency scaling cancels out) - Different tiles (all use same Perlin function) This enables deterministic, tile-order-independent normalization without requiring two-pass generation or global post-processing. **Sampling Strategy:** Generates 1000x1000 test region (1M points) to measure Perlin bounds: 1. Accumulates noise across all octaves 2. Measures min/max of accumulated raw values 3. Returns (min, max-min) for normalization formula See Also -------- _normalize : Uses these bounds for consistent normalization evaluateRegion : Generates raw noise, then normalizes with bounds __init__ : Calls this method once during initialization """ # Generate small test region to measure actual Perlin bounds testSize = 1000 testNoise = np.zeros((testSize, testSize), dtype=np.float64) # Base frequency freq = self.size / self.scale # Accumulate test noise across all octaves for octave in range(self.octaves): currFreq = freq * (2.0 ** octave) currAmp = self.persistence ** octave # Sample region in frequency space stop = currFreq * (testSize / self.size) iCoords = np.linspace(0, stop, testSize, endpoint=False) jCoords = np.linspace(0, stop, testSize, endpoint=False) iMesh, jMesh = np.meshgrid(iCoords, jCoords) # Evaluate this octave testNoise += self._perlin(iMesh, jMesh) * currAmp min = testNoise.min() diff = testNoise.max() - min return min, diff
#--------------------------------------------------------------------------
[docs] def _perlin(self, x:Union[Number, NPFltArr], y:Union[Number, NPFltArr], )->NPFltArr: """ Compute 2D Perlin noise at specified coordinates. Core Perlin noise algorithm that uses permutation table to select gradient vectors, computes dot products with coordinate offsets, and interpolates results using smooth fade functions. Accepts both scalar coordinates and array inputs. Parameters ---------- x : float or ndarray X coordinate(s) scaled by frequency. Can be a single scalar value or an array. y : float or ndarray Y coordinate(s) scaled by frequency. Can be a single scalar value or an array. Returns ------- noise : float or ndarray Raw Perlin noise values (not normalized). Return type matches input: scalar inputs return scalar output, array inputs return array output. Range varies but typically [-1, 1] before normalization. Notes ----- **Algorithm:** 1. Grid cell identification by floor division 2. Internal coordinates by fractional component 3. Gradient selection by permutation table hashing 4. Dot products with corner gradients 5. Smooth fade function application 6. Bilinear interpolation of corner values **Gradient Hashing:** Permutation table provides pseudo-random but deterministic gradient selection: - Same (xi, yi) always maps to same gradient (for given seed) - Different seeds produce completely different gradient patterns **Interpolation Quality:** Uses improved Perlin fade function (6t^5 - 15t^4 + 10t^3) for: - Smooth first derivative (no visible grid artifacts) - Smooth second derivative (no curvature discontinuities) - Implemented with Horner's method for polynomial evaluation **Scalar vs Array Inputs:** Method accepts both scalar and array coordinates: - Scalar inputs (x=float, y=float): Returns single noise value - Array inputs (x=ndarray, y=ndarray): Returns noise array matching input shape - Useful for both single-point queries and full map generation See Also -------- _fade : Smoothing function for interpolation weights _gradient : Gradient vector selection and dot product _evaluateTile : Calls this method to accumulate multi-octave noise Examples -------- ### Single point query (scalar inputs): >>> pn = PerlinNoise(size=100, seed=42) >>> noise_value = pn._perlin(5.3, 7.8) >>> print(f"Noise at (5.3, 7.8): {noise_value:.3f}") Noise at (5.3, 7.8): -0.134 ### Array-based generation (typical usage): >>> pn = PerlinNoise(size=100, seed=42) >>> # Create coordinate meshgrid >>> x = np.linspace(0, 10, 100) >>> X, Y = np.meshgrid(x, x) >>> # Generate single-octave noise >>> noise = pn._perlin(X, Y) >>> print(noise.shape) (100, 100) """ # Convert scalar inputs to numpy arrays x = np.asarray(x) y = np.asarray(y) # Grid cell coordinates xi = np.floor(x).astype(int) yi = np.floor(y).astype(int) xf = x - xi yf = y - yi # Hash the grid corners to permutation table g00 = self.p[self.p[ xi & 255] + ( yi & 255)] g01 = self.p[self.p[ xi & 255] + ((yi + 1) & 255)] g10 = self.p[self.p[(xi + 1) & 255] + ( yi & 255)] g11 = self.p[self.p[(xi + 1) & 255] + ((yi + 1) & 255)] # Dot product with gradients n00 = self._gradient(g00, xf , yf ) n01 = self._gradient(g01, xf , yf - 1) n10 = self._gradient(g10, xf - 1, yf ) n11 = self._gradient(g11, xf - 1, yf - 1) # Fade factors u = self._fade(xf) v = self._fade(yf) # Combine noise components using smoothing factors """First combine lefts and rights, then top with bottom""" x1 = n00 * (1 - u) + n10 * u x2 = n01 * (1 - u) + n11 * u return x1 * (1 - v) + x2 * v
#--------------------------------------------------------------------------
[docs] def _fade(self, t:NPFltArr)->NPFltArr: """ Smoothing function for Perlin noise interpolation. Applies improved Perlin fade curve (6t^5 - 15t^4 + 10t^3) to create smooth interpolation weights. Ensures C^2 continuity (smooth curvature) at grid cell boundaries, eliminating visible artifacts in generated noise. Parameters ---------- t : ndarray Interpolation parameter(s) in range [0, 1]. Typically internal coordinates (xf, yf) within grid cell. Returns ------- smoothed : ndarray Smoothed interpolation weights, same shape as input. Values in range [0, 1] with smooth derivatives. Notes ----- **Mathematical Form:** fade(t) = 6t^5 - 15t^4 + 10t^3 - **Properties:** - f(0) = 0 (zero at start) - f(1) = 1 (one at end) - f'(0) = 0 (zero first derivative at start) - f'(1) = 0 (zero first derivative at end) - f''(0) = 0 (zero second derivative at start) - f''(1) = 0 (zero second derivative at end) - **Derivatives:** - First derivative: f'(t) = 30t^4 - 60t^3 + 30t^2 = 30t^2(t - 1)^2 - Second derivative: f''(t) = 120t^3 - 180t^2 + 60t = 60t(2t - 1)(t - 1) **Horner's Method Implemented:** Uses Horner's method for efficient polynomial evaluation, performing multiplications instead of exponentiation. Reduces computation from 3 exponentiations and 2 multiplications to 6 multiplications. Provides faster execution and minimized floating-point errors. See Also -------- _perlin : Uses fade for interpolation weights u and v Examples -------- ### Fade curve shape: >>> import matplotlib.pyplot as plt >>> pn = PerlinNoise(size=100) >>> t = np.linspace(0, 1, 100) >>> fade_vals = pn._fade(t) >>> plt.plot(t, fade_vals, label='fade(t)') >>> plt.plot(t, t, '--', label='linear(t)') >>> plt.legend() >>> plt.xlabel('t') >>> plt.ylabel('weight') >>> plt.title('Improved Perlin Fade Curve') >>> plt.show() ### Derivative verification: >>> t = np.array([0.0, 0.5, 1.0]) >>> fade_t = pn._fade(t) >>> print(fade_t) [0. 0.5 1. ] # Smooth interpolation from 0 to 1 """ return t * t * t * (t * (t * 6 - 15) + 10)
#--------------------------------------------------------------------------
[docs] def _gradient(self, h:Union[int, NPIntArr], xf:Union[Number, NPFltArr], yf:Union[Number, NPFltArr], )->NPFltArr: """ Select gradient vector from hash and compute dot product with offset. Converts hashed grid coordinate to one of four unit gradient vectors and computes dot product with internal cell coordinates. This is the core operation that gives Perlin noise its characteristic coherent randomness. Parameters ---------- h : int or ndarray of int Hash value(s) from permutation table: h = p[p[xi] + yi]. Used to select gradient vector by modulo operation. Can be single integer or array of hash values. xf : float or ndarray X-offset from left edge of grid cell, range [0, 1). Represents fractional position within cell. Can be scalar or array. yf : float or ndarray Y-offset from top edge of grid cell, range [0, 1). Can be scalar or array. Returns ------- dot_product : float or ndarray Dot product of selected gradient with (xf, yf) offset vector. Return type matches input: scalar inputs return scalar, array inputs return array. Values typically in range [-1, 1] before interpolation. Notes ----- **Gradient Vector Selection:** Four possible unit gradient vectors selected by h % 4: .. code-block:: none h % 4 = 0: [0, 1] -> dot = yf (North-pointing) h % 4 = 1: [0, -1] -> dot = -yf (South-pointing) h % 4 = 2: [1, 0] -> dot = xf (East-pointing) h % 4 = 3: [-1, 0] -> dot = -xf (West-pointing) - **Dot Product Computation:** g * d = g_x * xf + g_y * yf where: - g: Gradient vector (from vectors array) - d: Displacement vector (xf, yf) **Why Only 4 Gradients:** Original Perlin noise used 8 or 12 gradients, but Perlin (2002) showed 4 gradients sufficient for 2D noise with proper hashing: - Simpler implementation - Faster computation - Visually indistinguishable from 8/12 gradient versions **Gradient Distribution:** Permutation table ensures pseudo-random but uniform distribution: - Each gradient appears ~25% of the time (on average) - No directional bias in generated terrain - Spatial coherence from grid structure, not gradient distribution **Hash to Gradient Mapping:** The modulo operation (h % 4) effectively reduces 8-bit hash (0-255) to 2-bit gradient index (0-3): - Throws away most hash bits (intentional) - Remaining bits sufficient for gradient selection - Preserves spatial coherence from permutation table **Array Broadcasting:** Function handles both scalar and array inputs by numpy broadcasting: - h, xf, yf can be single values or meshgrids - Vectorized operations over entire coordinate arrays - No explicit loops needed See Also -------- _perlin : Calls _gradient for each grid cell corner _calcPermutationTable : Generates hash values used here References ---------- [1] Perlin, K. (2002). "Improving Noise." ACM SIGGRAPH 2002. Explains gradient selection and reduction to 4 vectors for 2D. Examples -------- ### Single gradient selection (scalar inputs): >>> pn = PerlinNoise(size=100, seed=42) >>> h = 7 # Example hash value >>> xf, yf = 0.3, 0.7 # Internal coordinates >>> dot = pn._gradient(h, xf, yf) >>> print(f"Gradient index: {h % 4}, Dot product: {dot:.3f}") Gradient index: 3, Dot product: -0.300 ### Vectorized gradient computation (array inputs): >>> h_array = np.array([0, 1, 2, 3]) >>> xf_array = np.full(4, 0.5) >>> yf_array = np.full(4, 0.5) >>> dots = pn._gradient(h_array, xf_array, yf_array) >>> print(dots) [ 0.5 -0.5 0.5 -0.5] # yf, -yf, xf, -xf """ g = self._GRADIENTS[h % 4] # Handle scalar and array inputs if g.ndim == 1: # Scalar case: g is shape (2,) return (g[0] * xf) + (g[1] * yf) else: # Array case: g is shape (..., 2) return (g[..., 0] * xf) + (g[..., 1] * yf)
#--------------------------------------------------------------------------
[docs] def _normalize(self, n:NPFltArr)->NPFltArr: """ Normalize array to range [0, 1] using empirically corrected amplitude. Applies pre-computed normalization bounds (min, max-min) to map raw Perlin noise values to standard [0, 1] range. Ensures consistent normalization across all tiles and evaluation methods regardless of generation order. Parameters ---------- n : ndarray Raw Perlin noise before normalization. Returns ------- normalized : ndarray Array with values linearly scaled to [0, 1], same shape as input. Clipped to ensure exact bounds. Notes ----- **Formula:** normalized = (n - min) / diff where: - min: Empirical minimum - diff: Empirical range (max - min) **Clipping Rationale:** `np.clip(normalized, 0.0, 1.0)` handles edge cases: - Floating-point rounding errors near bounds - Extreme values in rare octave phase alignments - Numerical precision limits in large arrays In practice, clipping affects <0.5% of values and ensures exact [0, 1] range for downstream consumers (Floor, etc.). **Why Normalization:** Raw Perlin noise has inconsistent value ranges: - Single octave: typically [-0.7, +0.7] - Multiple octaves: range expands with octave count - Amplitude depends on persistence parameter Normalization ensures: - Consistent [0, 1] output range - Full utilization of available range - Predictable behavior for terrain generation See Also -------- _calcNormalizationBounds : Computes min_val and diff during initialization evaluateRegion : Generates raw noise, then normalizes with this method __init__ : Calls _calcNormalizationBounds() to set self._min, self._diff """ normalized = (n - self._min) / self._diff return np.clip(normalized, 0.0, 1.0)
#--------------------------------------------------------------------------
[docs] def _calcTileSize(self, size:Optional[int]=None)->int: """ Determine optimal tile size that evenly divides the array. Calculates a tile dimension that is either the entire array size for arrays smaller than _TILE_SIZE_PRE, or is the divisor of size with no remainder that is closest to _TILE_SIZE_STD for large arrays. Parameters ---------- size : int, optional Array side length. If None, uses self.size. Returns ------- tileSize : int Tile dimension that evenly divides size Notes ----- **Sizing Strategy:** - If size < _TILE_SIZE_PRE: Return size (entire array is one tile) - If size >= _TILE_SIZE_PRE: Find divisor closest to _TILE_SIZE_STD **Algorithm:** 1. Find all divisors of size: d where size % d == 0 2. Among divisors, select one closest to _TILE_SIZE_STD 3. This minimizes memory-generation time tradeoff **Tile-to-Grid Mapping:** Number of tiles in each dimension: numTiles = ceil(size / tileSize) - size=1000, tileSize=500 -> 2x2 grid -> 4 tiles - size=2000, tileSize=500 -> 4x4 grid -> 16 tiles - size=5000, tileSize=500 -> 10x10 grid -> 100 tiles See Also -------- _genTile : Generates individual tile of calculated size _isGenerated : Checks generation status using this tile size sampleRegion : Uses cached tiling for region queries evaluateRegion : Uses tiled computation for region queries Examples -------- >>> pn = PerlinNoise(size=1000, seed=42) >>> pn._calcTileSize() 500 >>> pn._calcTileSize(size=1700) 425 >>> pn._calcTileSize(size=1024) 512 """ size = self.size if (size is None) else size # Less than TILE_SIZE_PRE: tileSize = size if (size < self._TILE_SIZE_PRE): return size # Find divisors of size divisors = [] for i in range(1, int(np.sqrt(size)) + 1): if (size % i == 0): divisors.append(i) divisors.append(size // i) # duplicate if square is fine return min(divisors, key=lambda x: abs(x - self._TILE_SIZE_STD))
#--------------------------------------------------------------------------
[docs] def _isGenerated(self, i:int, j:int)->bool: """ Check if the tile containing position at (i,j) has been generated. Converts array position to tile grid index and queries the internal tile cache dictionary. Returns generation status without triggering tile computation. Used to determine if lazy evaluation is needed. Parameters ---------- i : int Column index (x direction, horizontal). Valid range: [0, size). j : int Row index (y direction, vertical). Valid range: [0, size). Returns ------- isGenerated : bool True if the tile containing position (i, j) has been generated and cached. Notes ----- **Tile Grid Mapping:** Converts position to tile indices by integer division: >>> tileI = i // self._tileSize >>> tileJ = j // self._tileSize Tiles are indexed by (tileI, tileJ) in self._tiles dictionary. **Cache Structure:** self._tiles dictionary stores boolean flags: - Key: (tileI, tileJ) tuple - Value: True if generated, False or missing if not Noise data itself is stored in self.noise array, not in _tiles dict. The _tiles dictionary only tracks generation status for lazy evaluation. See Also -------- sample : Uses _isGenerated() to determine if tile generation needed _genTile : Generates tile and marks as generated in _tiles Examples -------- >>> pn = PerlinNoise(size=2000, seed=42) >>> # Check position in center >>> pn._isGenerated(1000, 1000) False # Large array, not yet generated >>> >>> # Trigger generation with sample call >>> _ = pn.sample(1000, 1000) >>> >>> # Check again >>> pn._isGenerated(1000, 1000) True # Tile now cached >>> >>> # Nearby position in same tile also marked generated >>> pn._isGenerated(1050, 1050) True # Same tile (if tileSize > 50) >>> >>> # Inspect tile dictionary >>> print(pn._tiles) {(2, 2): True} """ tileI = i // self._tileSize tileJ = j // self._tileSize return self._tiles.get((tileI, tileJ), False)
#--------------------------------------------------------------------------
[docs] def _genTile(self, tileI:int, tileJ:int)->None: """ Generate and cache a tile of the noise array at grid position (tileI, tileJ). Evaluates the multi-octave Perlin noise for the specified tile region and writes results to the corresponding area of the noise array. Marks tile as generated in cache dictionary to prevent redundant computation. Facilitates lazy evaluation. Parameters ---------- tileI : int Tile grid column index ("x" direction, horizontal). Valid range: [0, _tileN). tileJ : int Tile grid row index ("y" direction, vertical). Valid range: [0, _tileN). Notes ----- **Tile Generation Process:** 1. Check if tile is already generated by look-up in _tiles dictionary 2. Convert tile grid indices to array slice bounds 3. Generate normalized Perlin noise for region by _evaluateTile() 4. Copy tile data to noise array, overwriting sentinel values 5. Update cache dictionary to prevent recomputation **Tile Management:** Tile data stored in two places: 1. self.noise[jMin:jMax, iMin:iMax]: Actual noise values 2. self._tiles[(tileI, tileJ)]: Boolean flag (True = generated) Dictionary stores only generation status, not tile data itself. This minimizes memory overhead for tile management. See Also -------- _evaluateTile : Core tile generation implementation _isGenerated : Check tile generation status sample : Triggers tile generation on-demand sampleRegion : Generates multiple tiles as needed _calcTileSize : Determines tile dimensions """ # Guard if self._tiles.get((tileI, tileJ), False): return # Determine array index bounds for this tile iMin = tileI * self._tileSize iMax = (tileI + 1) * self._tileSize jMin = tileJ * self._tileSize jMax = (tileJ + 1) * self._tileSize # Generate tile tile = self._evaluateTile(iMin, iMax, jMin, jMax) # Write to noise array self.noise[jMin:jMax, iMin:iMax] = tile # Mark tile as generated self._tiles[(tileI, tileJ)] = True
#--------------------------------------------------------------------------
[docs] def _evaluateTile(self, iMin:int, iMax:int, jMin:int, jMax:int, )->NPFltArr: """ Evaluate multi-octave Perlin noise for a single tile without caching. Computes normalized Perlin noise over specified rectangular region by accumulating contributions from all octaves. Each octave is evaluated at progressively higher frequency and lower amplitude. Results are normalized to [0, 1] range using precomputed empirical bounds. This is the core computation method for tile-based lazy evaluation. Parameters ---------- iMin : int Column index (x direction, horizontal) lower bound (inclusive). iMax : int Column index upper bound (exclusive). jMin : int Row index (y direction, vertical) lower bound (inclusive). jMax : int Row index upper bound (exclusive). Returns ------- noise : ndarray, shape (jMax-jMin, iMax-iMin) Normalized Perlin noise values in range [0, 1]. Note array indexing is [row, column] = [j, i]. Notes ----- **Tile Evaluation Algorithm:** 1. **Region Dimensions:** Calculate tile width and height from bounds: >>> width = iMax - iMin >>> height = jMax - jMin 2. **Base Frequency Calculation:** Determine fundamental frequency from scale parameter: >>> freq = self.size / self.scale Lower scale -> higher frequency -> finer features. 3. **Octave Accumulation:** For each octave in range [0, self.octaves): a. **Frequency and Amplitude:** >>> currFreq = freq * (2.0 ** octave) >>> currAmp = self.persistence ** octave Frequency doubles each octave (frequency_i = freq * 2^i). Amplitude decays each octave (amplitude_i = persistence^i). b. **Frequency Space Mapping:** Convert tile bounds to Perlin coordinate space: >>> iStart = (iMin / self.size) * currFreq >>> iEnd = (iMax / self.size) * currFreq >>> jStart = (jMin / self.size) * currFreq >>> jEnd = (jMax / self.size) * currFreq Maps array indices to frequency-scaled coordinates. c. **Coordinate Grid Generation:** Create meshgrid spanning frequency space region: >>> iCoords = np.linspace(iStart, iEnd, width, endpoint=False) >>> jCoords = np.linspace(jStart, jEnd, height, endpoint=False) >>> iMesh, jMesh = np.meshgrid(iCoords, jCoords) Generates width x height grid of evaluation points. d. **Perlin Evaluation:** Compute raw Perlin noise at meshgrid coordinates: >>> currNoise = self._perlin(iMesh, jMesh) * currAmp Applies amplitude scaling to this octave's contribution. e. **Accumulation:** Add octave contribution to cumulative noise: >>> noise += currNoise Builds up fractal detail across frequency scales. 4. **Normalization:** Apply pre-computed bounds for consistent [0, 1] mapping: >>> return self._normalize(noise) Uses self._min and self._diff from _calcNormalizationBounds(). **Multi-Octave Generation:** Fractal detail achieved by summing octaves at different frequencies: - Octave 0: Base frequency (freq), full amplitude (1.0) - Octave 1: Double frequency (2*freq), reduced amplitude (persistence) - Octave 2: Quadruple frequency (4*freq), reduced (persistence^2) - Octave i: Frequency = freq * 2^i, Amplitude = persistence^i Lower octaves contribute large-scale features (rolling hills). Higher octaves add fine detail (rocky texture). **Coordinate Space Transformation:** Array indices must be mapped to frequency space for Perlin evaluation: - Array space: Integer indices [0, size) with uniform spacing - Frequency space: Float coordinates [0, currFreq) with non-uniform features - Transformation: coord_freq = (index / size) * currFreq This ensures correct spatial frequency regardless of tile location. See Also -------- _genTile : Wrapper that calls this method and caches result evaluateRegion : Public method that uses this for direct evaluation _perlin : Core Perlin noise function used per octave _normalize : Applies empirical bounds for [0, 1] mapping _calcNormalizationBounds : Pre-computes bounds used in normalization Warnings -------- **Large Region Memory Risk:** This method does not enforce tile-based chunking. Requesting regions with max(width, height) > 6000 may cause memory allocation errors or system instability. Method assumes caller has already performed appropriate chunking for larger requests. **Internal Use Only:** This method is not intended for direct external calls. Public methods (sample, sampleRegion, etc) provide safe interfaces with automatic tile management. Direct calls here bypass safety checks. """ width = iMax - iMin height = jMax - jMin # Base frequency freq = self.size / self.scale # Allocate array noise = np.zeros((height, width), dtype=np.float64) # Sum noise across octaves for octave in range(self.octaves): # Frequency and amplitude for this octave currFreq = freq * (2.0 ** octave) currAmp = self.persistence ** octave # Map chunk bounds to frequency space iStart = (iMin / self.size) * currFreq iEnd = (iMax / self.size) * currFreq jStart = (jMin / self.size) * currFreq jEnd = (jMax / self.size) * currFreq # Create chunk coordinate arrays iCoords = np.linspace(iStart, iEnd, width, endpoint=False) jCoords = np.linspace(jStart, jEnd, height, endpoint=False) iMesh, jMesh = np.meshgrid(iCoords, jCoords) # Evaluate Perlin noise currNoise = self._perlin(iMesh, jMesh) * currAmp # Accumulate noise noise += currNoise return self._normalize(noise)
###############################################################################
[docs]class Pollution: """ Gaussian plume model for point-source pollution dispersion in ocean environments. Simulates the concentration distribution of a pollutant released from a fixed point source using a steady-state Gaussian plume model. Accounts for wind-driven advection, atmospheric dispersion, and plume rise effects. Suitable for modeling chemical spills, thermal discharges, or tracer releases in AUV sensor simulation. Parameters ---------- source : list of float, default=[0, 0, 30] Pollution source location [x, y, z] in meters (END frame). - x: East coordinate - y: North coordinate - z: Depth below surface (positive value; stored as negative internally) Q : float, default=1.59 Source emission rate (strength) in grams per second (g/s). Typical values: 0.5-5.0 g/s for tracer experiments. u : float, default=1.5 Wind/current speed in meters per second (m/s). Valid range: [0.1, 2.5] m/s (automatically clipped to bounds). Drives horizontal advection of plume. v : float, default=pi/4 (45 deg) Wind/current direction in radians. Measured counterclockwise from East axis (END convention): - 0: East - pi/2: North - pi: West - -pi/2: South seed : int, optional PRNG seed for reproducibility when using random parameters. If None and randomness enabled, generates random seed from entropy. random : bool, default=False If True, randomizes both u and v within default ranges. Overrides explicit u and v values. randomU : bool or list of float, default=False Wind speed randomization: - True: Randomize u within [0.1, 2.5] m/s - [low, high]: Randomize u within custom range - False: Use explicit u parameter randomV : bool or list of float, default=False Wind direction randomization: - True: Randomize v within [0, 2pi] radians - [low, high]: Randomize v within custom range - False: Use explicit v parameter oceanSize : int, default=1000 Side length of ocean floor area in meters. Used for boundary checking and visualization. oceanOrigin : list of float, default=[500, 500] Coordinates [x, y] where (0, 0) maps to floor array. oceanDepth : float, default=125 Maximum ocean depth in meters (positive value). Concentration set to zero beyond this depth. Attributes ---------- x_s : float Source x-coordinate (East) in meters. y_s : float Source y-coordinate (North) in meters. z_s : float Source depth in meters (stored as negative value). Q : float Emission rate in g/s. u : float Wind/current speed in m/s (clipped to [0.1, 2.5]). v : float Wind/current direction in radians (wrapped to [0, 2*pi]). source : list of float Combined [x_s, y_s, z_s] property (z returned as positive). seed : int PRNG seed used for randomization. oceanSize : int Ocean area dimension for boundaries. oceanOrigin : list of float Origin coordinates for coordinate system. oceanDepth : float Maximum depth for concentration calculations. Methods ------- __call__(x, y, z) Pollution instance as callable, calculate concentration at coordinates (x, y, z). Supports scalar or array inputs. Returns concentration in g/m^3. get2D(x, y, z, size) Generate 2D concentration array at specified depth. Returns (concentration, X_mesh, Y_mesh). get3D(x, y, z, size) Generate 3D concentration volume (memory-intensive). Returns (concentration, X_mesh, Y_mesh, Z_mesh). automesh(size, res, center, is3d) Create coordinate meshgrids centered on plume distribution. display2D(x, y, z, size) Visualize 2D concentration contour plot at depth z. display3D(x, y, z, size) Visualize 3D isosurface of concentration distribution. Notes ----- **Gaussian Plume Theory:** - **Physical Model:** The pollution plume is simulated using a basic Gaussian fluid mechanics model. Assuming wind along the x-axis, the pollutant concentration C at any point (x, y, z) is given by: .. code-block:: none C(x, y, z) = (Q / (2*pi*u*sigma_y*sigma_z)) * exp(-y'^2/(2*sigma_y^2)) * [exp(-(z-H_e)^2/(2*sigma_z^2)) + exp(-(z+H_e)^2/(2*sigma_z^2))] where: - Q: Emission rate (g/s) - u: Wind speed (m/s) - (x', y'): Rotated coordinates aligned with wind direction - sigma_y: Horizontal dispersion coefficient (m) - sigma_z: Vertical dispersion coefficient (m) - H_e: Effective release height (m) - z: Depth coordinate (negative downward) - **Coordinate Rotation:** Wind direction v rotates the coordinate system: .. code-block:: none x' = (x - x_s) * cos(v) + (y - y_s) * sin(v) y' = -(x - x_s) * sin(v) + (y - y_s) * cos(v) Aligns x' axis with wind direction for plume advection. - **Effective Release Height:** Plume buoyancy causes vertical rise: .. code-block:: none H_e = z_s + Delta h(x') Delta h(x') = 2.126 * 10^-4 * |x'|^(2/3) Rise increases with distance from source. - **Dispersion Coefficients:** Empirical formulas for neutral stability: .. code-block:: none sigma_y = 1.360 * |x'|^(0.82) sigma_z = 0.275 * |x'|^(0.69) Dispersion grows with distance, creating characteristic plume cone shape. **Model Assumptions:** 1. Steady-state: Concentration field does not change with time 2. Constant wind: Uniform u and v throughout domain 3. Homogeneous medium: No density stratification or temperature gradients 4. Point source: Instantaneous release from infinitesimal volume 5. Passive tracer: No chemical reactions or biological uptake 6. Neutral stability: No strong buoyancy effects (moderate temperature) **Validity Regime:** - Distance from source: 100m - 10km - Wind speed: 0.5 - 10 m/s (underwater currents typically 0.1-2 m/s) - Flat terrain (ocean floor variations negligible) - Low-moderate emission rates (Q < 10 g/s) **Limitations:** - Does not model time-varying sources - Ignores turbulent mixing in wake of AUV or obstacles - No chemical decay or transformation - Ground reflection term simplistic **Coordinate System:** Uses END (East-North-Down) convention throughout: - Positive z is depth below surface - Internally stored as negative for calculation consistency - User inputs/outputs use positive depth convention **Boundary Conditions:** - Ocean surface (z = 0): Perfect reflection (concentration decays to zero) - Ocean floor (z = -oceanDepth): Perfect reflection - Lateral boundaries: Open (concentration -> 0 as distance -> infinity) - Upwind of source (x' < 0): Concentration = 0 (no upstream diffusion) **Numerical Stability:** - Division by zero protection at source (x' -> R_MIN = 0.01 m) - Exponential overflow prevented by clipping large arguments - Dispersion coefficients bounded to avoid singularities Warnings -------- - get3D() method can cause out-of-memory errors for domains larger than ~1000x1000x100 points. Use get2D() for large-scale visualization. - Concentration calculation becomes unreliable very near source (r < 1 meter). Model assumes far-field dispersion regime. - Wind speed u < 0.5 m/s may produce unrealistic plume spreading (stagnation effects not modeled). - No validation for underwater releases (model derived from atmospheric dispersion). Use with caution for ocean applications. See Also -------- Ocean : Container class that manages Pollution instance Current1D : Provides wind/current speed and direction for u, v parameters navigation.Sensor : Base class for sensors that could sample concentration References ---------- [1] A. A. Abdel-Rahman, “On the atmospheric dispersion and gaussian plume model,” in Proceedings of the 2nd International Conference on Waste Management, Water Pollution, Air Pollution, Indoor Climate, Corfu, Greece, vol. 26, 2008. Examples -------- ### Create pollution source with defaults: >>> import munetauvsim.environment as env >>> pollution = env.Pollution() >>> print(pollution) Pollution Source: (0.00, 0.00, 30.00) Strength: 1.59 g/s Speed: 1.50 m/s at 0.79 rad Seed: None ### Custom pollution parameters: >>> pollution = env.Pollution( ... source=[200, 300, 25], # 200m East, 300m North, 25m depth ... Q=2.5, # 2.5 g/s emission rate ... u=0.8, # 0.8 m/s current speed ... v=np.pi/6, # 30 deg from East (ENE direction) ... seed=42 ... ) ### Query concentration at single point: >>> pollution = env.Pollution(source=[0, 0, 30], Q=2.0, u=1.0, v=0) >>> # Sample 100m East, 50m North, 28m depth >>> conc = pollution(100, 50, 28) >>> print(f"Concentration: {conc:.4e} g/m^3") Concentration: 5.4460e-04 g/m^3 # Example value ### Query concentration at array of points: >>> x_points = np.array([50, 100, 150, 200]) >>> y_points = np.array([0, 0, 0, 0]) >>> z_points = np.array([28, 28, 28, 28]) >>> concentrations = pollution(x_points, y_points, z_points) >>> print(concentrations) [0.00205464 0.00077644 0.00042928 0.00028042] # Example values ### Generate 2D concentration map at depth: >>> pollution = env.Pollution(source=[500, 500, 30], u=1.2, v=np.pi/4) >>> C, X, Y = pollution.get2D( ... x=[400, 700], ... y=[400, 700], ... z=28 # 2m above source ... ) >>> print(f"Concentration array shape: {C.shape}") Concentration array shape: (300, 300) >>> print(f"Max concentration: {C.max():.4e} g/m^3") Max concentration: 5.7852e-03 g/m^3 ### Random pollution for Monte Carlo studies: >>> # Random speed and direction >>> poll_random = env.Pollution( ... source=[0, 0, 30], ... Q=1.5, ... random=True, ... seed=12345 ... ) >>> print(f"Random u: {poll_random.u:.2f} m/s") Random u: 0.65 m/s >>> print(f"Random v: {poll_random.v:.2f} rad") Random v: 1.99 rad >>> >>> # Custom random ranges >>> poll_custom = env.Pollution( ... source=[0, 0, 30], ... Q=1.5, ... randomU=[0.5, 1.5], # Speed range ... randomV=[0, np.pi/2], # Direction range (East to North) ... seed=67890 ... ) >>> poll_custom.display2D() ### Integration with Ocean environment: >>> ocean = Ocean.calm_ocean(createPlume=True) >>> # Pollution automatically created with ocean current parameters >>> print(ocean.pollution) >>> # Query concentration at vehicle position >>> vehicle_pos = [250, 300, 20] >>> conc = ocean.pollution(*vehicle_pos) ### AUV data collection: >>> # Define pollution field >>> pollution = env.Pollution(source=[500, 500, 25], Q=3.0, u=1.0, v=0) >>> >>> # Simulate AUV path >>> import munetauvsim.guidance as guid >>> path = guid.Waypoint([0, 500, 1000], [500, 500, 500], [20, 20, 20]) >>> >>> # Sample along path >>> import matplotlib.pyplot as plt >>> concentrations = [] >>> for i in range(len(path)): ... x, y, z = path.pos[i] ... # Build this out into a custom navigation.Sensor subclass ... conc = pollution(x, y, z) ... concentrations.append(conc) >>> >>> plt.plot(concentrations) >>> plt.xlabel('Waypoint index') >>> plt.ylabel('Concentration (g/m^3)') >>> plt.title('Concentration profile along AUV path') >>> plt.show() ### Memory-conscious 3D volume: >>> # For large domains, use subregions >>> pollution = env.Pollution(source=[0, 0, 30], u=1.5, v=0) >>> # Only compute small region around AUV position >>> C, X, Y, Z = pollution.get3D( ... x=[0, 100], # 100m extent ... y=[-20, 20], # 40m width ... z=[25, 35] # 10m depth range ... ) >>> print(f"3D array shape: {C.shape}") """ ## Class Constants =======================================================# _U_MIN = 0.1 # Minimum wind speed in m/s _U_MAX = Current1D._V_HI # Maximum wind speed in m/s _V_MIN = 0 # Minimum wind direction in radians _V_MAX = 2 * np.pi # Maximum wind direction in radians _R_MIN = 0.01 # Minimum source distance for dispersion ## Constructor ===========================================================#
[docs] def __init__(self, source:Union[NPFltArr,List[float]] = [0, 0, 30], Q:float=1.59, u:Number=1.5, v:Number=np.pi/4, seed:Optional[int] = None, random:bool=False, randomU:Union[bool, List[float]]=False, randomV:Union[bool, List[float]]=False, oceanSize:int = 1000, oceanOrigin:List[float] = [500, 500], oceanDepth:float = 125, **kwargs, ): """ Initialize Pollution source with dispersion parameters and randomization options. Constructs a Gaussian plume pollution model with specified source location, emission rate, and wind/current conditions. Supports deterministic parameters or randomized generation for Monte Carlo simulations. Automatically handles coordinate system conventions and parameter validation. Parameters ---------- source : list of float or ndarray, default=[0, 0, 30] Pollution source coordinates [x, y, z] in meters (END frame). - x: East position (can be negative if origin offset) - y: North position (can be negative if origin offset) - z: Depth below surface (input as positive, stored as negative) Default places source at origin, 30m depth. Q : float, default=1.59 Source emission rate (strength) in grams per second (g/s). Represents mass flux of pollutant released per unit time. Typical values: - Tracer experiments: 0.1-2.0 g/s - Small spills: 2.0-10.0 g/s - Large industrial discharges: 10-100 g/s Must be positive. No upper bound enforced (user responsibility). u : float, default=1.5 Wind or current speed in meters per second (m/s). Drives horizontal advection of plume. Valid range: [0.1, 2.5] m/s. Values outside range are automatically clipped with warning. Typical ocean currents: 0.1-1.0 m/s. Ignored if random=True or randomU specified. v : float, default=pi/4 (45 degrees) Wind or current direction in radians (END convention). Measured counterclockwise from East axis: - 0: East - pi/2: North - pi: West - -pi/2 or 3pi/2: South Input automatically wrapped to [0, 2*pi] range. Ignored if random=True or randomV specified. seed : int, optional Pseudo-random number generator seed for reproducibility. Used when random=True, randomU=True, or randomV=True. If None and randomization enabled, generates unique seed from system entropy. If None and no randomization, no RNG used (deterministic). random : bool, default=False Master randomization flag. If True: - Generates random u from [0.1, 2.5] m/s - Generates random v from [0, 2pi] radians - Overrides explicit u and v parameters - Triggers seed generation if seed=None Useful for quick Monte Carlo setup without specifying ranges. randomU : bool or list of float, default=False Wind speed randomization control: - False: Use explicit u parameter (default) - True: Randomize u within [_U_MIN=0.1, _U_MAX=2.5] m/s - [low, high]: Randomize u within custom range [low, high] m/s Takes precedence over random flag for u parameter. Custom ranges not validated (user must ensure physical validity). randomV : bool or list of float, default=False Wind direction randomization control: - False: Use explicit v parameter (default) - True: Randomize v within [0, 2pi] radians - [low, high]: Randomize v within custom range [low, high] radians Takes precedence over random flag for v parameter. Custom ranges automatically wrapped to [0, 2pi]. oceanSize : int, default=1000 Side length of ocean floor area in meters. Used for: - Boundary condition enforcement (concentration -> 0 beyond edges) - Default domain for get2D(), get3D(), automesh() - Coordinate wrapping in visualization methods Should match Ocean.size if using integrated environment. oceanOrigin : list of float, default=[500, 500] Origin offset coordinates [x_o, y_o] in meters. Defines where (x=0, y=0) maps in absolute coordinate system. Allows negative coordinates without array indexing issues. Should match Ocean.origin if using integrated environment. oceanDepth : float, default=125 Maximum ocean depth in meters (positive value). Used as lower boundary condition: - Concentration set to 0 for z < -oceanDepth - Prevents unrealistic plume penetration below seafloor Should match Ocean.floor.z + Ocean.floor.z_range if using integrated environment for consistency. **kwargs Additional keyword arguments. Attributes ---------- x_s : float Source East coordinate (stored directly). y_s : float Source North coordinate (stored directly). z_s : float Source depth (stored as negative value internally). Q : float Emission rate (stored directly). u : float Wind speed (after validation/randomization, clipped to [0.1, 2.5]). v : float Wind direction (after validation/randomization, wrapped to [0, 2*pi]). seed : int or None Actual seed used (generated if random and seed=None). _rng : numpy.random.Generator or None RNG instance (used only if randomization requested). oceanSize : int Stored for boundary/visualization methods. oceanOrigin : list of float Stored for coordinate transformations. oceanDepth : float Stored for depth boundary enforcement. Notes ----- **Initialization Process** 1. **Source Coordinates:** Unpacks [x, y, z] from source parameter. Converts z to negative value: z_s = -abs(z_input). 2. **Emission Rate:** Stores Q directly (no validation, user must ensure Q > 0). 3. **Randomization Setup:** If any randomization flag True and seed=None: - Generates seed from np.random.SeedSequence().entropy - Creates RNG instance: self._rng = np.random.default_rng(seed) 4. **Wind Speed Determination (priority order):** a. If random=True or randomU=True: Uniform random in [0.1, 2.5] b. If randomU=[low, high]: Uniform random in [low, high] c. Otherwise: Use explicit u parameter Finally: Clip to [0.1, 2.5] via u.setter validation. 5. **Wind Direction Determination (priority order):** a. If random=True or randomV=True: Uniform random in [0, 2pi] b. If randomV=[low, high]: Uniform random in [low, high] c. Otherwise: Use explicit v parameter Finally: Wrap to [0, 2pi] via v.setter validation. 6. **Ocean Parameters:** Store oceanSize, oceanOrigin, oceanDepth for later use. **Coordinate Convention:** User provides z as positive depth (intuitive), but internally stored as negative for calculation consistency with mathematical model implemented. >>> pollution = Pollution(source=[0, 0, 30]) >>> pollution.z_s # Stored as -30 -30.0 **Randomization Examples:** Full randomization: >>> poll = Pollution(random=True, seed=42) >>> # Both u and v randomized with seed 42 Partial randomization: >>> poll = Pollution(u=1.0, randomV=True, seed=42) >>> # u fixed at 1.0, only v randomized Custom ranges: >>> poll = Pollution( ... randomU=[0.5, 1.5], ... randomV=[0, np.pi], # East to West only ... seed=42 ... ) **Parameter Validation:** u and v automatically validated by property setters: - u clipped to [0.1, 2.5] with warning if out of bounds - v wrapped to [0, 2pi] with warning if out of bounds - Q not validated (user must ensure Q > 0) - z always stored as negative (automatic conversion) **Integration with Ocean:** When Ocean creates Pollution, passes current parameters: >>> ocean = Ocean( ... spd='moderate', # Current speed category ... ang='northeast', # Current direction category ... plume=[200, 300, 25], # Source location ... # ... ... ) >>> # Ocean.__init__ calls: >>> pollution = Pollution( ... source=plume, ... u=ocean.current.v_spd, # Mean speed from Current1D ... v=ocean.current.b_ang, # Mean angle from Current1D ... oceanSize=size, ... oceanOrigin=origin, ... oceanDepth=ocean.floor.z, ... # ... ... ) **Log Warnings** Logs warning if u or v out of valid bounds and clipping applied. See Also -------- Ocean.__init__ : Creates Pollution with current-derived parameters Current1D : Provides u and v from v_spd and b_ang attributes Examples -------- ### Minimal initialization: >>> import munetauvsim.environment as env >>> pollution = env.Pollution() >>> print(f"Source: {pollution.source}") Source: [0.0, 0.0, -30.0] >>> print(f"Emission: {pollution.Q} g/s") Emission: 1.59 g/s >>> print(f"Wind: {pollution.u} m/s at {pollution.v} rad") Wind: 1.5 m/s at 0.7853981633974483 rad ### Custom deterministic parameters: >>> pollution = env.Pollution( ... source=[250, 300, 20], ... Q=2.5, ... u=0.8, ... v=np.pi/3, # 60 degrees ... oceanSize=2000, ... oceanDepth=150 ... ) ### Mixed random and fixed: >>> pollution = env.Pollution( ... source=[0, 0, 30], ... Q=1.5, ... u=1.0, # Fixed speed ... randomV=[0, np.pi/2], # Random direction (East to North) ... seed=42 ... ) >>> print(f"Fixed u: {pollution.u}") Fixed u: 1.0 >>> print(f"Random v: {pollution.v}") Random v: 1.2157273181723998 ### Validate parameter clipping: >>> pollution = env.Pollution(u=5.0, v=10*np.pi) WARNING: Wind speed out of bounds, clipping to 2.50 m/s WARNING: Wind direction out of bounds, clipping to 0.00 radians >>> pollution.u 2.5 >>> pollution.v 0.0 """ self.x_s, self.y_s, self.z_s = source self.Q = Q # Randomness if (((random) or (randomV is not False) or (randomU is not False)) and (seed is None)): seed = np.random.SeedSequence().entropy self.seed = seed self._rng = np.random.default_rng(self.seed) # Set wind speed if ((random) or (randomU is True)): self.u = self._rng.uniform(self._U_MIN, self._U_MAX) if (isinstance(randomU, list)): self.u = self._rng.uniform(randomU[0], randomU[1]) if ('_u' not in self.__dict__): self.u = u # Set wind direction if ((random) or (randomV is True)): self.v = self._rng.uniform(self._V_MIN, self._V_MAX) if (isinstance(randomV, list)): self.v = self._rng.uniform(randomV[0], randomV[1]) if ('_v' not in self.__dict__): self.v = v self.oceanSize = oceanSize self.oceanOrigin = oceanOrigin self.oceanDepth = oceanDepth
## Properties ============================================================# @property def u(self) -> float: """Wind speed in m/s.""" return self._u @u.setter def u(self, u:float) -> None: """Set wind speed with bounds check.""" u_clip = np.clip(u, self._U_MIN, self._U_MAX) if (u != u_clip): log.warning( "Wind speed out of bounds, clipping to %.2f m/s", u_clip) self._u = u_clip #. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . @property def v(self) -> float: """Wind direction in radians.""" return self._v @v.setter def v(self, v:float) -> None: """Set wind direction with bounds check.""" v_norm = v % (2 * np.pi) v_clip = np.clip(v_norm, self._V_MIN, self._V_MAX) if (v != v_clip): log.warning( "Wind direction out of bounds, clipping to %.2f radians", v_clip) self._v = v_clip #. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . @property def source(self) -> List[float]: """Coordinates of the pollution source.""" return [self.x_s, self.y_s, self.z_s] @source.setter def source(self, source:List[float]) -> None: """Set the pollution source coordinates.""" self.x_s, self.y_s, z_s = source self.z_s = -abs(z_s) #. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . @property def z_s(self) -> float: """Depth of the pollution source.""" return self._z_s @z_s.setter def z_s(self, z_s:float) -> None: """Set the depth of the pollution source.""" self._z_s = -abs(z_s) ## Special Methods =======================================================#
[docs] def __call__(self, x: Union[Number, List[Number], NPFltArr], y: Union[Number, List[Number], NPFltArr], z: Union[Number, List[Number], NPFltArr], ) -> Union[float, NPFltArr]: """ Return concentration at (x, y, z) coordinates. Parameters ---------- x, y, z : float, list, or ndarray Coordinates. Supports scalar or array inputs. Returns ------- concentration : float or ndarray Pollutant concentration in g/m^3. Scalar if inputs scalar, array if inputs array. """ # Convert input to arrays for consistent processing inputIsScalar = np.isscalar(x) and np.isscalar(y) and np.isscalar(z) x, y, z = map(np.asarray, (x, y, z)) # Enforce consistent usage of z coordinate z = -abs(z) # Get coordinates relative to pollution source x_r = x - self.x_s y_r = y - self.y_s # Rotate coordinates based on wind direction x_rot = x_r * np.cos(self.v) + y_r * np.sin(self.v) y_rot = -x_r * np.sin(self.v) + y_r * np.cos(self.v) # Avoid divide by zero errors near source x_rot = np.where(x_rot == 0, self._R_MIN, x_rot) # Calculate effective release height delta_h = 2.126e-4 * np.abs(x_rot)**(2/3) He = self.z_s + delta_h # Calculate dispersion coefficients sigma_y = 1.36 * np.abs(x_rot)**0.82 sigma_z = 0.275 * np.abs(x_rot)**0.69 # Calculate concentration coeff = self.Q / (2 * np.pi * self.u * sigma_y * sigma_z) exp_y = np.exp(-y_rot**2 / (2 * sigma_y**2)) exp_z = (np.exp(-(z - He)**2 / (2 * sigma_z**2)) + np.exp(-(z + He)**2 / (2 * sigma_z**2))) # Set concentration to 0 for opposite direction of wind concentration = np.where(x_rot >= 0, coeff * exp_y * exp_z, 0) # Set ocean surface and floor as boundaries concentration = np.where((z > 0) | (z < -abs(self.oceanDepth)), 0, concentration) # Return scalar if inputs were scalar return concentration.item() if inputIsScalar else concentration
#--------------------------------------------------------------------------
[docs] def __repr__(self) -> str: """Detailed description of Pollution.""" fmt = '.2f' return ( f"{self.__class__.__name__}(" f"x={self.x_s:{fmt}}, " f"y={self.y_s:{fmt}}, " f"z={abs(self.z_s):{fmt}}, " f"Q={self.Q:{fmt}}, " f"u={self.u:{fmt}}, " f"v={self.v:{fmt}}, " f"seed={self.seed})" )
#--------------------------------------------------------------------------
[docs] def __str__(self) -> str: """User friendly description of Pollution.""" fmt = '.2f' cw = 16 return ( f"Pollution\n" f"{' Source:':{cw}} ({self.x_s:{fmt}}, {self.y_s:{fmt}}, " f"{abs(self.z_s):{fmt}})\n" f"{' Strength:':{cw}} {self.Q:{fmt}} g/s\n" f"{' Speed:':{cw}} {self.u:{fmt}} m/s at {self.v:{fmt}} rad\n" f"{' Seed:':{cw}} {self.seed}\n" )
## Methods ===============================================================#
[docs] def get2D(self, x:Optional[List[float]] = None, y:Optional[List[float]] = None, z:Optional[float] = None, size:Optional[int] = None, )->Tuple[NPFltArr, NPFltArr, NPFltArr]: """ Return 2D array of pollution concentration at depth z and meshgrids. Parameters ---------- x, y : list, optional [min, max] bounds. If None, uses oceanSize. z : float, optional Depth for 2D slice. If None, uses value 20m above z_s. size : int, optional Domain size if x,y not provided. Default is oceanSize. Returns ------- C : ndarray 2D concentration array. X, Y : ndarray Meshgrid coordinate arrays. Notes ----- **Precision Trade-off** Fixed resolution (1-unit spacing) creates data discrepancies compared to direct `__call__()` evaluations due to discretization effects. Method prioritizes convenience over precision for rapid 2D array generation. **Usage Recommendation:** - Use `get2D()` for visualization and general analysis - Use `__call__()` for exact point concentrations and sensor simulation """ if ((x is None) or (y is None)): size = self.oceanSize if (size is None) else size o = self.oceanOrigin x = [-o[0], size - o[0]] y = [-o[1], size - o[1]] x_l = np.linspace(x[0], x[1], int(x[1] - x[0])) y_l = np.linspace(y[0], y[1], int(y[1] - y[0])) dz = 20 z = self.z_s + dz if (z is None) else -abs(z) X, Y = np.meshgrid(x_l, y_l) return self(X, Y, z), X, Y
#-------------------------------------------------------------------------#
[docs] def get3D(self, x:Optional[List[float]] = None, y:Optional[List[float]] = None, z:Optional[List[float]] = None, size:Optional[int] = None, )->Tuple[NPFltArr, NPFltArr, NPFltArr, NPFltArr]: """ Return 3D array of pollution concentration and meshgrids. Parameters ---------- x, y : list, optional [min, max] bounds. If None, uses oceanSize. z : list, optional [min, max] depth bounds. If None, uses full ocean depth. size : int, optional Domain size if x,y not provided. Default is oceanSize. Returns ------- C : ndarray 3D concentration array. X, Y, Z : ndarray Meshgrid coordinate arrays. Notes ----- **Memory Limitation:** Large 3D arrays can exceed available memory. Arrays greater than approximately (1000,1000,70) require more than ~250M total data points for all 4 returned data structures. This method is retained for caching smaller local regions at a time. **Design Intent:** Method provides foundation for pre-computed concentration sampling to avoid repeated calculations during simulation. **Performance Recommendation:** Use smaller domains or consider boundary arrays instead of full meshgrids for stability. """ if ((x is None) or (y is None)): size = self.oceanSize if (size is None) else size o = self.oceanOrigin x = [-o[0], size - o[0]] y = [-o[1], size - o[1]] if (z is None): z = [-self.oceanDepth, 0] x_l = np.linspace(x[0], x[1], int(x[1] - x[0])) y_l = np.linspace(y[0], y[1], int(y[1] - y[0])) z_l = np.linspace(z[0], z[1], int(z[1] - z[0])) X, Y, Z = np.meshgrid(x_l, y_l, z_l) return self(X, Y, Z), X, Y, Z
#-------------------------------------------------------------------------#
[docs] def automesh(self, size:Optional[int] = None, res:int = 100, center:bool = False, is3d:bool = False, )->Union[Tuple[NPFltArr, NPFltArr], Tuple[NPFltArr, NPFltArr, NPFltArr]]: """ Return meshgrid arrays around pollution distribution. Parameters ---------- size : int, optional Domain size. If None, uses oceanSize. res : int, default=100 Number of points per dimension. center : bool, default=False If True, pollution source is put at the center. Otherwise, source is off-center and area is centered around the plume. is3d : bool, default=False If True, return 3D meshgrid. Returns ------- X, Y : ndarray (if is3d=False) 2D meshgrid arrays. X, Y, Z : ndarray (if is3d=True) 3D meshgrid arrays. """ size = self.oceanSize if (size is None) else size half = size/2 if (center): x = np.linspace(self.x_s - half, self.x_s + half, res) y = np.linspace(self.y_s - half, self.y_s + half, res) else: o = 0.05 # source offset from edge x_w = np.cos(self.v) # weight in x y_w = np.sin(self.v) # weight in y x_s = size * (1-2*o) * x_w/2 # center shift in x y_s = size * (1-2*o) * y_w/2 # center shift in y x = np.linspace(self.x_s + x_s - half, self.x_s + x_s + half, res) y = np.linspace(self.y_s + y_s - half, self.y_s + y_s + half, res) if (is3d): z = np.linspace(self.z_s-10, 0, res) return np.meshgrid(x, y, z) return np.meshgrid(x, y)
#-------------------------------------------------------------------------#
[docs] def display2D(self, z:Optional[int] = None, size:Optional[int] = None, resolution:int = 100, fromOcean:bool = False, )->None: """ Display 2D image of pollution concentration with contour lines. Parameters ---------- z : float, optional Depth for 2D slice. If None, uses value 20m above z_s. size : int, optional Domain size. Default is oceanSize. resolution : int, default=100 Number of points per dimension. fromOcean : bool, default=False If True, uses the ocean dimensions to set the domain size. """ if (z is None): z = self.z_s + 20 if (not fromOcean): size = self.oceanSize if (size is None) else size X, Y = self.automesh(size, resolution, center=False) Z = self(X, Y, -abs(z)) else: Z, X, Y = self.get2D(z=z) # Adjust alpha values in color map cmap = plt.cm.viridis_r cmap_a = cmap(np.arange(cmap.N)) split = int(cmap.N * 0.2) a = np.linspace(0, 1, split) alpha = a**1.6 cmap_a[:split,-1] = alpha cmap_a = mpl.colors.ListedColormap(cmap_a) # Create filled contour plot plt.figure(figsize=(12, 10)) contour = plt.contourf(X, Y, Z, levels=50, cmap=cmap_a) # Add colorbar plt.colorbar(contour, label='Concentration') # Highlight pollution source plt.scatter(self.x_s, self.y_s, color='red', s=100, marker='*', label='Pollution Source') plt.title( f'2D Pollution Concentration\nWind Speed: {self.u:.2f} m/s, ' f'Direction: {np.degrees(self.v):.1f}°\n' f'Source depth: {abs(self.z_s):.1f} m, Plot depth: {abs(z):.1f} m', fontsize=16) plt.xlabel('X (m)', fontsize=12) plt.ylabel('Y (m)', fontsize=12) plt.legend() plt.tight_layout() plt.show() plt.savefig('pollution_concentration_2D.png', bbox_inches='tight')
#-------------------------------------------------------------------------#
[docs] def display3D(self, size:Optional[int] = None, resolution:int = 50, )->None: """ Display 3D representation of pollution concentration. Parameters ---------- size : int, optional Domain size. Default is oceanSize. resolution : int, default=50 Number of points per dimension. """ size = self.oceanSize if (size is None) else size X, Y, Z = self.automesh(size, resolution, center=False, is3d=True) C = self(X, Y, Z) # Create a figure with 3D axes fig = plt.figure(figsize=(14, 12)) ax = fig.add_subplot(111, projection='3d') # Normalize concentration for color mapping C_normalized = (C - np.min(C)) / (np.max(C) - np.min(C)) # Adjust alpha values in color map cmap = plt.cm.viridis_r cmap_a = cmap(np.arange(cmap.N)) split = int(cmap.N * 0.1) a = np.linspace(0, 1, split) alpha = a**1.6 cmap_a[:split,-1] = alpha cmap_a = mpl.colors.ListedColormap(cmap_a) # Create a continuous 3D volume rendering using pcolormesh xx, yy = X[:,:,0], Y[:,:,0] for i in range(resolution): z_slice = Z[:,:,i] c_slice = C_normalized[:,:,i] ax.plot_surface(xx, yy, z_slice, facecolors=cmap_a(c_slice), rstride=1, cstride=1, shade=False) ax.set_title( f'3D Pollution Concentration\nWind Speed: {self.u:.2f} m/s, ' f'Direction: {np.degrees(self.v):.2f}°', fontsize=16) ax.set_xlabel('X (m)', fontsize=12) ax.set_ylabel('Y (m)', fontsize=12) ax.set_zlabel('Z (m)', fontsize=12) # Add colorbar norm = plt.Normalize(C_normalized.min(), C_normalized.max()) sm = plt.cm.ScalarMappable(cmap=cmap_a, norm=norm) sm.set_array([]) cbar = plt.colorbar(sm, ax=ax, shrink=0.5, aspect=10, pad=0.1) cbar.set_label('Concentration', fontsize=12) plt.tight_layout() plt.show() plt.savefig('pollution_concentration_3D.png', bbox_inches='tight')
###############################################################################