Skip to content

Swarm

Central orchestrator that manages a fleet of drones.

Each drone communicates over its own MAVLink connection. All public methods are async coroutines. State access is protected by per-drone asyncio locks to prevent race conditions.

enable_anomaly_detection(window_size=30)

Enable telemetry anomaly detection (compare-to-neighbors pattern).

Once enabled, the telemetry loop will feed drone metrics into the :class:AnomalyDetector every cycle and log any anomalies found.

Parameters:

Name Type Description Default
window_size int

Number of readings to retain per metric for rolling statistics.

30

disable_anomaly_detection()

Disable anomaly detection.

set_geofence(polygon, alt_max_m=120.0, action='rtl')

Configure a geofence for the swarm.

Parameters:

Name Type Description Default
polygon list[tuple[float, float]]

List of (lat, lon) vertices defining the boundary.

required
alt_max_m float

Maximum altitude in metres AGL.

120.0
action str

Action on breach -- "rtl", "land", or "warn".

'rtl'

clear_geofence()

Remove the current geofence.

enable_collision_avoidance(min_distance_m=5.0, method='orca')

Enable automatic collision avoidance with the given minimum separation.

disable_collision_avoidance()

Disable automatic collision avoidance.

enable_formation_hold(leader_id, offsets, gains=None)

Enable continuous closed-loop formation correction.

Once enabled, the telemetry loop will call :meth:FormationController.compute_corrections every cycle and send adjusted goto commands to keep followers at their target offsets from the leader.

Works alongside the existing :meth:formation method: formation() sets the initial positions (open-loop), enable_formation_hold() maintains them (closed-loop).

Parameters:

Name Type Description Default
leader_id str

ID of the leader drone (must be registered).

required
offsets dict[str, tuple[float, float, float]]

Per-follower NED offsets in meters from the leader. Keys are follower drone IDs, values are (north_m, east_m, down_m) tuples.

required
gains FormationGains | None

Optional :class:FormationGains. Defaults to FormationGains() (P-only with kp=0.8).

None

Raises:

Type Description
KeyError

If leader_id is not a registered drone.

disable_formation_hold()

Stop closed-loop formation correction.

enable_path_planning(obstacles=None, resolution_m=5.0)

Enable intelligent path planning for goto() commands.

When enabled, goto() automatically routes through the A* path planner instead of flying in a direct line.

Parameters:

Name Type Description Default
obstacles list[tuple[float, float, float]] | None

List of (lat, lon, radius_m) no-fly zones.

None
resolution_m float

Grid cell size in metres for the planner.

5.0

disable_path_planning()

Disable intelligent path planning; goto() reverts to direct flight.

register_from_fleet(fleet_dir='fleet')

Auto-register all drones from fleet registry JSON files.

connect_all() async

Connect to all registered drones and start the telemetry loop.

arm(drone_id, retries=3) async

Arm a drone. Returns True on success, False on failure.

NOTE: The telemetry loop must be paused before calling this, otherwise motors_armed_wait() will race with the telemetry reader for MAVLink messages.

takeoff(drone_id=None, altitude=None) async

Take off a single drone or all drones.

If drone_id is None, all registered drones take off.

rtl(drone_id=None) async

Return-to-launch for a single drone or all drones.

formation(pattern='v', spacing=15.0, heading=0.0, center=None, altitude=None) async

Move all drones into a formation pattern.

Parameters:

Name Type Description Default
pattern str

"v", "line", "triangle", "circle", "grid", or "orbit".

'v'
spacing float

Distance in meters between drones.

15.0
heading float

Heading in degrees for the formation.

0.0
center tuple[float, float] | None

(lat, lon) center point; defaults to first drone's position.

None
altitude float | None

Altitude in meters; defaults to config default.

None

rotate(degrees=360.0, duration_s=30.0, spacing=15.0) async

Rotate the current formation around its centroid.

Sends a series of formation commands at different headings to create a smooth rotation animation. Useful for demos and shows.

Parameters:

Name Type Description Default
degrees float

Total rotation in degrees (default 360 = full turn).

360.0
duration_s float

Time in seconds for the full rotation.

30.0
spacing float

Formation spacing in meters.

15.0

sweep(bounds, altitude=None) async

Execute an area-sweep mission.

Parameters:

Name Type Description Default
bounds list[tuple[float, float]]

[(sw_lat, sw_lon), (ne_lat, ne_lon)] corners.

required
altitude float | None

Altitude in meters; defaults to config default.

None

replan_on_loss(lost_drone_id)

Redistribute a lost drone's waypoints optimally across active drones.

Uses the Hungarian algorithm via :func:~drone_swarm.allocation.replan_optimal when scipy is available, falling back to nearest-neighbor assignment otherwise.

simulate(n_drones=3, *, config=None, sitl_path=None, base_port=5760, home=(35.363261, -117.669056), spacing_m=5.0, speedup=1, auto_connect=True) async classmethod

Create a ready-to-fly simulated swarm.

Launches n_drones ArduPilot SITL instances, registers them as sim-0, sim-1, ... sim-{n-1}, and optionally connects.

Returns (swarm, sim_harness) so the caller can later call await sim_harness.stop() to tear everything down.

Example::

swarm, sim = await Swarm.simulate(n_drones=3)
await swarm.takeoff(altitude=10)
# ... fly ...
await swarm.shutdown()
await sim.stop()