Swarm¶
Central orchestrator that manages a fleet of drones.
Each drone communicates over its own MAVLink connection. All public methods are async coroutines. State access is protected by per-drone asyncio locks to prevent race conditions.
enable_anomaly_detection(window_size=30)
¶
Enable telemetry anomaly detection (compare-to-neighbors pattern).
Once enabled, the telemetry loop will feed drone metrics into the
:class:AnomalyDetector every cycle and log any anomalies found.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
window_size
|
int
|
Number of readings to retain per metric for rolling statistics. |
30
|
disable_anomaly_detection()
¶
Disable anomaly detection.
set_geofence(polygon, alt_max_m=120.0, action='rtl')
¶
Configure a geofence for the swarm.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
polygon
|
list[tuple[float, float]]
|
List of |
required |
alt_max_m
|
float
|
Maximum altitude in metres AGL. |
120.0
|
action
|
str
|
Action on breach -- |
'rtl'
|
clear_geofence()
¶
Remove the current geofence.
enable_collision_avoidance(min_distance_m=5.0, method='orca')
¶
Enable automatic collision avoidance with the given minimum separation.
disable_collision_avoidance()
¶
Disable automatic collision avoidance.
enable_formation_hold(leader_id, offsets, gains=None)
¶
Enable continuous closed-loop formation correction.
Once enabled, the telemetry loop will call
:meth:FormationController.compute_corrections every cycle and send
adjusted goto commands to keep followers at their target offsets
from the leader.
Works alongside the existing :meth:formation method:
formation() sets the initial positions (open-loop),
enable_formation_hold() maintains them (closed-loop).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
leader_id
|
str
|
ID of the leader drone (must be registered). |
required |
offsets
|
dict[str, tuple[float, float, float]]
|
Per-follower NED offsets in meters from the leader.
Keys are follower drone IDs, values are
|
required |
gains
|
FormationGains | None
|
Optional :class: |
None
|
Raises:
| Type | Description |
|---|---|
KeyError
|
If leader_id is not a registered drone. |
disable_formation_hold()
¶
Stop closed-loop formation correction.
enable_path_planning(obstacles=None, resolution_m=5.0)
¶
Enable intelligent path planning for goto() commands.
When enabled, goto() automatically routes through the A*
path planner instead of flying in a direct line.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
obstacles
|
list[tuple[float, float, float]] | None
|
List of |
None
|
resolution_m
|
float
|
Grid cell size in metres for the planner. |
5.0
|
disable_path_planning()
¶
Disable intelligent path planning; goto() reverts to direct flight.
register_from_fleet(fleet_dir='fleet')
¶
Auto-register all drones from fleet registry JSON files.
connect_all()
async
¶
Connect to all registered drones and start the telemetry loop.
arm(drone_id, retries=3)
async
¶
Arm a drone. Returns True on success, False on failure.
NOTE: The telemetry loop must be paused before calling this,
otherwise motors_armed_wait() will race with the telemetry
reader for MAVLink messages.
takeoff(drone_id=None, altitude=None)
async
¶
Take off a single drone or all drones.
If drone_id is None, all registered drones take off.
rtl(drone_id=None)
async
¶
Return-to-launch for a single drone or all drones.
formation(pattern='v', spacing=15.0, heading=0.0, center=None, altitude=None)
async
¶
Move all drones into a formation pattern.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pattern
|
str
|
|
'v'
|
spacing
|
float
|
Distance in meters between drones. |
15.0
|
heading
|
float
|
Heading in degrees for the formation. |
0.0
|
center
|
tuple[float, float] | None
|
|
None
|
altitude
|
float | None
|
Altitude in meters; defaults to config default. |
None
|
rotate(degrees=360.0, duration_s=30.0, spacing=15.0)
async
¶
Rotate the current formation around its centroid.
Sends a series of formation commands at different headings to create a smooth rotation animation. Useful for demos and shows.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
degrees
|
float
|
Total rotation in degrees (default 360 = full turn). |
360.0
|
duration_s
|
float
|
Time in seconds for the full rotation. |
30.0
|
spacing
|
float
|
Formation spacing in meters. |
15.0
|
sweep(bounds, altitude=None)
async
¶
Execute an area-sweep mission.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bounds
|
list[tuple[float, float]]
|
|
required |
altitude
|
float | None
|
Altitude in meters; defaults to config default. |
None
|
replan_on_loss(lost_drone_id)
¶
Redistribute a lost drone's waypoints optimally across active drones.
Uses the Hungarian algorithm via :func:~drone_swarm.allocation.replan_optimal
when scipy is available, falling back to nearest-neighbor assignment otherwise.
simulate(n_drones=3, *, config=None, sitl_path=None, base_port=5760, home=(35.363261, -117.669056), spacing_m=5.0, speedup=1, auto_connect=True)
async
classmethod
¶
Create a ready-to-fly simulated swarm.
Launches n_drones ArduPilot SITL instances, registers them as
sim-0, sim-1, ... sim-{n-1}, and optionally connects.
Returns (swarm, sim_harness) so the caller can later call
await sim_harness.stop() to tear everything down.
Example::
swarm, sim = await Swarm.simulate(n_drones=3)
await swarm.takeoff(altitude=10)
# ... fly ...
await swarm.shutdown()
await sim.stop()