# -*- coding: utf-8 -*-
"""
Utility functions.
@author: Paul T. Grogan <paul.grogan@asu.edu>
"""
import re
from typing import List, Union
import numpy as np
from numba import njit
import geopandas as gpd
from pyproj import Transformer
from sgp4.conveniences import sat_epoch_datetime
from sgp4.api import Satrec
from shapely import Geometry, make_valid, simplify
from shapely.geometry import (
Point,
Polygon,
MultiPolygon,
GeometryCollection,
LineString,
)
from shapely.ops import split, transform
from skyfield.api import wgs84
from skyfield.framelib import itrs
from skyfield.toposlib import GeographicPosition
from skyfield.positionlib import Geocentric
from spiceypy.spiceypy import edlimb, inelpl, nvp2pl, recgeo, surfpt
from spiceypy.utils.exceptions import NotFoundError
from . import constants
from . import config
[docs]
@njit
def mean_anomaly_to_true_anomaly(mean_anomaly: float, eccentricity: float = 0) -> float:
"""
Converts mean anomaly to true anomaly.
Args:
mean_anomaly (float): The mean anomaly (degrees).
true_anomaly (float): The orbit eccentricity.
Returns:
float: The true anomaly (degrees).
"""
mean_anomaly_rad = np.radians(mean_anomaly)
true_anomaly_rad = (
mean_anomaly_rad
+ (2 * eccentricity - (1 / 4) * eccentricity**3) * np.sin(mean_anomaly_rad)
+ (5 / 4) * eccentricity**2 * np.sin(2 * mean_anomaly_rad)
+ (13 / 12) * eccentricity**3 * np.sin(3 * mean_anomaly_rad)
)
return np.degrees(true_anomaly_rad)
[docs]
@njit
def true_anomaly_to_mean_anomaly(true_anomaly: float, eccentricity: float = 0) -> float:
"""
Converts true anomaly to mean anomaly.
Args:
true_anomaly (float): The true anomaly (degrees).
eccentricity (float): The orbit eccentricity.
Returns:
float: The mean anomaly (degrees).
"""
true_anomaly_rad = np.radians(true_anomaly)
mean_anomaly_rad = (
true_anomaly_rad
- 2 * eccentricity * np.sin(true_anomaly_rad)
+ ((3 / 4) * eccentricity**2 + (1 / 8) * eccentricity**4)
* np.sin(2 * true_anomaly_rad)
- (1 / 3) * eccentricity**3 * np.sin(3 * true_anomaly_rad)
+ (5 / 32) * eccentricity**4 * np.sin(4 * true_anomaly_rad)
)
return np.degrees(mean_anomaly_rad)
[docs]
@njit
def compute_number_samples(distance: float) -> int:
"""
Compute the number of global samples required to achieve a typical
sample distance (meters) assuming equal spacing.
Args:
distance (float): The typical distance between samples (meters).
Returns:
int: The number of global samples.
"""
# compute the angular distance of each sample (assuming mean sphere)
theta = distance / constants.EARTH_MEAN_RADIUS
# compute the distance from the center of earth to conic plane (assuming sphere)
radius = constants.EARTH_MEAN_RADIUS * np.cos(theta / 2)
# compute the distance from the conic plane to the surface (assuming sphere)
height = constants.EARTH_MEAN_RADIUS - radius
# compute the sperical cap area covered by the sample (assuming sphere)
# https://en.wikipedia.org/wiki/Spherical_cap
sample_area = 2 * np.pi * constants.EARTH_MEAN_RADIUS * height
# return the fraction of earth-to-sample area
return int(constants.EARTH_SURFACE_AREA / sample_area)
[docs]
@njit
def swath_width_to_field_of_regard(
altitude: float, swath_width: float, elevation: float = 0
) -> float:
"""
Fast conversion from swath width to field of regard.
Args:
altitude (float): Altitude (meters) above WGS 84 datum for the observing instrument.
swath_width (float): Observation diameter (meters) at specified elevation.
elevation (float): Elevation (meters) above WGS 84 datum to observe.
Returns:
float: The field of regard (degrees).
"""
# rho is the angular radius of the earth viewed by the satellite
sin_rho = (constants.EARTH_MEAN_RADIUS + elevation) / (
constants.EARTH_MEAN_RADIUS + altitude
)
# lambda is the Earth central angle
sin_lambda = np.sin((swath_width / 2) / (constants.EARTH_MEAN_RADIUS + elevation))
# eta is the angular radius of the region viewable by the satellite
tan_eta = sin_rho * sin_lambda / (1 - sin_rho * np.cos(np.arcsin(sin_lambda)))
return np.degrees(2 * np.arctan(tan_eta))
[docs]
@njit
def swath_width_to_field_of_view(
altitude: float, swath_width: float, look_angle: float = 0, elevation: float = 0
) -> float:
"""
Fast conversion from swath width to field of view considering off-nadir pointing.
Args:
altitude (float): Altitude (meters) above WGS 84 datum for the observing instrument.
swath_width (float): Observation diameter (meters) at specified elevation.
look_angle (float): Off-nadir look angle (degrees) to observation center.
elevation (float): Elevation (meters) above WGS 84 datum to observe.
Returns:
float: The field of view (degrees).
"""
# rho is the angular radius of the earth viewed by the satellite
sin_rho = (constants.EARTH_MEAN_RADIUS + elevation) / (
constants.EARTH_MEAN_RADIUS + altitude
)
# eta is the angular radius from sub-satellite point to center of view
sin_eta = min(sin_rho, np.sin(np.radians(look_angle) / 2))
# epsilon is the satellite elevation from the center of view
cos_epsilon = sin_eta / sin_rho
# lambda is the Earth central angle to the center of view
_lambda = np.pi / 2 - np.arcsin(sin_eta) - np.arccos(cos_epsilon)
sin_lambda_1 = np.sin(
_lambda - (swath_width / 2) / (constants.EARTH_MEAN_RADIUS + elevation)
)
sin_lambda_2 = np.sin(
_lambda + (swath_width / 2) / (constants.EARTH_MEAN_RADIUS + elevation)
)
# eta is the angular radius of the region viewable by the satellite
tan_eta_1 = sin_rho * sin_lambda_1 / (1 - sin_rho * np.cos(np.arcsin(sin_lambda_1)))
tan_eta_2 = sin_rho * sin_lambda_2 / (1 - sin_rho * np.cos(np.arcsin(sin_lambda_2)))
return np.degrees(np.arctan(tan_eta_2) - np.arctan(tan_eta_1))
[docs]
@njit
def field_of_regard_to_swath_width(
altitude: float, field_of_regard: float, elevation: float = 0
) -> float:
"""
Fast conversion from field of regard to swath width.
Args:
altitude (float): Altitude (meters) above WGS 84 datum for the observing instrument.
field_of_regard (float): Angular width (degrees) of observation.
elevation (float): Elevation (meters) above WGS 84 datum to observe.
Returns:
float: The observation diameter (meters) at the specified elevation.
"""
# rho is the angular radius of the earth viewed by the satellite
sin_rho = (constants.EARTH_MEAN_RADIUS + elevation) / (
constants.EARTH_MEAN_RADIUS + altitude
)
# eta is the angular radius of the region viewable by the satellite
sin_eta = min(sin_rho, np.sin(np.radians(field_of_regard) / 2))
# epsilon is the min satellite elevation for obs (grazing angle)
cos_epsilon = sin_eta / sin_rho
# lambda is the Earth central angle
_lambda = np.pi / 2 - np.arcsin(sin_eta) - np.arccos(cos_epsilon)
return 2 * (constants.EARTH_MEAN_RADIUS + elevation) * _lambda
[docs]
@njit
def compute_field_of_regard(
altitude: float, min_elevation_angle: float, elevation: float = 0
) -> float:
"""
Fast computation of field of regard for observation with a minimum altitude angle.
Args:
altitude (float): Altitude (meters) above WGS 84 datum for the observing instrument.
min_elevation_angle (float): The minimum elevation angle (degrees) for observation.
elevation (float): Elevation (meters) above WGS 84 datum to observe.
Returns:
float: Angular width (degrees) of observation.
"""
# rho is the angular radius of the earth viewed by the satellite
sin_rho = (constants.EARTH_MEAN_RADIUS + elevation) / (
constants.EARTH_MEAN_RADIUS + altitude
)
# epsilon is the min satellite elevation for obs (grazing angle)
cos_epsilon = np.cos(np.radians(min_elevation_angle))
# eta is the angular radius of the region viewable by the satellite
sin_eta = sin_rho * cos_epsilon
return np.degrees(np.arcsin(sin_eta) * 2)
[docs]
@njit
def compute_min_elevation_angle(
altitude: float, field_of_regard: float, elevation: float = 0
) -> float:
"""
Fast computation of minimum elevation angle required to observe a point.
Args:
altitude (float): Altitude (meters) above WGS 84 datum for the observing instrument.
field_of_regard (float): Angular width (degrees) of observation.
elevation (float): Elevation (meters) above WGS 84 datum to observe.
Returns:
float: The minimum elevation angle (degrees) for observation.
"""
# eta is the angular radius of the region viewable by the satellite
sin_eta = np.sin(np.radians(field_of_regard) / 2)
# rho is the angular radius of the earth viewed by the satellite
sin_rho = (constants.EARTH_MEAN_RADIUS + elevation) / (
constants.EARTH_MEAN_RADIUS + altitude
)
# epsilon is the min satellite elevation for obs (grazing angle)
cos_epsilon = sin_eta / sin_rho
if cos_epsilon > 1:
return 0
return np.degrees(np.arccos(cos_epsilon))
[docs]
@njit
def compute_orbit_period(altitude: float) -> float:
"""
Fast computation of approximate orbital period.
Args:
altitude (float): Altitude (meters) above WGS 84 datum for the observing instrument.
Returns:
float: The orbital period (seconds).
"""
semimajor_axis = constants.EARTH_MEAN_RADIUS + altitude
mean_motion_rad_s = np.sqrt(constants.EARTH_MU / semimajor_axis**3)
return 2 * np.pi / mean_motion_rad_s
[docs]
@njit
def compute_max_access_time(altitude: float, min_elevation_angle: float) -> float:
"""
Fast computation of maximum access time to observe a point.
Args:
altitude (float): Altitude (meters) above WGS 84 datum for the observing instrument.
min_elevation_angle (float): Minimum elevation angle (degrees) for observation.
Returns:
float: The maximum access time (seconds) for observation.
"""
orbital_distance = (constants.EARTH_MEAN_RADIUS + altitude) * (
np.pi - 2 * np.radians(min_elevation_angle)
)
orbital_velocity = np.sqrt(
constants.EARTH_MU / (constants.EARTH_MEAN_RADIUS + altitude)
)
return orbital_distance / orbital_velocity
@njit
def compute_ground_velocity(altitude: float, inclination: float) -> float:
"""
Fast computation of mean ground velocity for a nadir-pointing instrument.
Args:
altitude (float): Altitude (meters) above WGS 84 datum for the observing instrument.
inclination (float): Inclination (degrees) of the observing instrument orbit.
Returns:
float: The access time (seconds) for observation.
"""
semimajor_axis = constants.EARTH_MEAN_RADIUS + altitude
mean_motion_rad_s = np.sqrt(constants.EARTH_MU / semimajor_axis**3)
return constants.EARTH_MEAN_RADIUS * (
mean_motion_rad_s
- (2 * np.pi * np.cos(np.degrees(inclination)) / constants.EARTH_SIDEREAL_DAY_S)
)
@njit
def along_track_distance_to_access_time(
altitude: float, inclination: float, along_track: float
) -> float:
"""
Fast computation of mean access time for a specified along track distance.
Args:
altitude (float): Altitude (meters) above WGS 84 datum for the observing instrument.
inclination (float): Inclination (degrees) of the observing instrument orbit.
along_track (float): Along track distance (meters) observed during access.
Returns:
float: The access time (seconds) for observation.
"""
semimajor_axis = constants.EARTH_MEAN_RADIUS + altitude
mean_motion_rad_s = np.sqrt(constants.EARTH_MU / semimajor_axis**3)
ground_velocity = constants.EARTH_MEAN_RADIUS * (
mean_motion_rad_s
- (2 * np.pi * np.cos(np.degrees(inclination)) / constants.EARTH_SIDEREAL_DAY_S)
)
return along_track / ground_velocity
@njit
def access_time_to_along_track_distance(
altitude: float, inclination: float, access_time: float
) -> float:
"""
Fast computation of along track distance for a specified access time.
Args:
altitude (float): Altitude (meters) above WGS 84 datum for the observing instrument.
inclination (float): Inclination (degrees) of the observing instrument orbit.
access_time (float): Access time (seconds) during observation.
Returns:
float: The observation along track distance (meters).
"""
semimajor_axis = constants.EARTH_MEAN_RADIUS + altitude
mean_motion_rad_s = np.sqrt(constants.EARTH_MU / semimajor_axis**3)
ground_velocity = constants.EARTH_MEAN_RADIUS * (
mean_motion_rad_s
- (2 * np.pi * np.cos(np.degrees(inclination)) / constants.EARTH_SIDEREAL_DAY_S)
)
return ground_velocity * access_time
def compute_projected_ray_position(
orbit_track: Geocentric,
cross_track_field_of_view: float,
along_track_field_of_view: float,
roll_angle: float = 0,
pitch_angle: float = 0,
is_rectangular: bool = False,
angle: float = 0,
elevation: float = 0,
) -> GeographicPosition:
"""
Get the location of a projected ray from an instrument.
Args:
orbit_track (skyfield.positionlib.Geocentric): the satellite orbit track.
cross_track_field_of_view (float): the instrument cross-track
(orthogonal to velocity vector) field of view (degrees).
along_track_field_of_view (float): the instrument along-track
(parallel to velocity vector) field of view (degrees).
roll_angle (float): the instrument roll (right-hand about
velocity vector) angle (degrees).
pitch_angle (float): the instrument pitch (right-hand about
orbit normal vector) angle (degrees).
is_rectangular (bool): `True` if the instrument view has a rectangular
shape (otherwise elliptical).
angle (float): ray angle (degrees) counterclockwise from right-hand cross-track
direction about the instrument field of view.
elevation (float): The elevation (meters) at which project the footprint.
Returns:
(skyfield.toposlib.GeographicPosition): the geographic position of the projected ray
"""
# extract earth-fixed position and velocity
position, velocity = orbit_track.frame_xyz_and_velocity(itrs)
# velocity unit vector
v = np.divide(velocity.m_per_s, np.linalg.norm(velocity.m_per_s, axis=0))
# binormal unit vector
b = np.divide(-position.m, np.linalg.norm(position.m, axis=0))
# normal unit vector
if len(np.shape(position.m)) > 1:
n = np.cross(v, b, 0, 0, -1).T
else:
n = np.cross(v, b)
# construct projected ray
if is_rectangular:
# find orientation of rectangle corner
theta = np.arctan(along_track_field_of_view / cross_track_field_of_view)
# along track half width
tan_a_2 = np.tan(np.radians(along_track_field_of_view / 2))
# cross track half width
tan_c_2 = np.tan(np.radians(cross_track_field_of_view / 2))
# compose the ray with different equations for each side
ray = (
b
+ v * np.tan(np.radians(pitch_angle))
+ n * np.tan(np.radians(roll_angle))
+ v
* (
tan_a_2
if theta <= angle <= np.pi - theta
else (
-tan_a_2
if np.pi + theta <= angle <= 2 * np.pi - theta
else (
tan_c_2 * np.tan(angle)
if angle < theta
else (
tan_c_2 * np.tan(np.pi - angle)
if angle < np.pi
else (
-tan_c_2 * np.tan(angle - np.pi)
if angle < np.pi + theta
else -tan_c_2 * np.tan(2 * np.pi - angle)
)
)
)
)
)
+ n
* (
tan_c_2
if angle <= theta or angle >= 2 * np.pi - theta
else (
-tan_c_2
if np.pi - theta <= angle <= np.pi + theta
else (
tan_a_2 * np.tan(np.pi / 2 - angle)
if angle < np.pi / 2
else (
-tan_a_2 * np.tan(angle - np.pi / 2)
if angle < np.pi - theta
else (
-tan_a_2 * np.tan(3 * np.pi / 2 - angle)
if angle < 3 * np.pi / 2
else tan_a_2 * np.tan(angle - 3 * np.pi / 2)
)
)
)
)
)
)
else:
ray = (
b
+ v * np.tan(np.radians(pitch_angle))
+ n * np.tan(np.radians(roll_angle))
+ v * np.sin(angle) * np.tan(np.radians(along_track_field_of_view / 2))
+ n * np.cos(angle) * np.tan(np.radians(cross_track_field_of_view / 2))
)
geos = np.zeros_like(position.m)
for i in range(np.size(geos, axis=1)) if geos.ndim > 1 else [-1]:
_position = position.m[:, i].copy() if i >= 0 else position.m
_ray = ray[:, i].copy() if i >= 0 else ray
# find the intersection of the ray and the WGS 84 geoid
try:
pt = surfpt(
_position,
_ray,
constants.EARTH_EQUATORIAL_RADIUS + elevation,
constants.EARTH_EQUATORIAL_RADIUS + elevation,
constants.EARTH_POLAR_RADIUS + elevation,
)
geo = recgeo(
pt,
constants.EARTH_EQUATORIAL_RADIUS,
constants.EARTH_FLATTENING,
)
if i >= 0:
geos[:, i] = geo
else:
geos[:] = geo
except NotFoundError:
# projected point does not fall on the WGS 84 geoid surface
# compute the observable limb ellipse for WGS 84 geoid
limb = edlimb(
constants.EARTH_EQUATORIAL_RADIUS + elevation,
constants.EARTH_EQUATORIAL_RADIUS + elevation,
constants.EARTH_POLAR_RADIUS + elevation,
_position,
)
# find the two intersection points between orthogonal plane and limb ellipse
_v = v[:, i].copy() if i >= 0 else v
_n = n[:, i].copy() if i >= 0 else n
_, pt_1, pt_2 = inelpl(
limb,
nvp2pl(
_v * np.sin(np.pi / 2 + angle) + _n * np.cos(np.pi / 2 + angle),
_position,
),
)
# compute the angles between the ray and limb intersection points
angle_1 = np.arccos(
np.dot(_ray, pt_1) / np.linalg.norm(_ray) / np.linalg.norm(pt_1)
)
angle_2 = np.arccos(
np.dot(_ray, pt_2) / np.linalg.norm(_ray) / np.linalg.norm(pt_2)
)
# use the limb intersection point closer to the ray
limb_pt = pt_1 if angle_1 <= angle_2 else pt_2
limb_geo = recgeo(
limb_pt,
constants.EARTH_EQUATORIAL_RADIUS,
constants.EARTH_FLATTENING,
)
if i >= 0:
geos[:, i] = limb_geo
else:
geos[:] = limb_geo
# return resulting geographic position
if len(np.shape(geos)) > 1:
return wgs84.latlon(np.degrees(geos[1, :]), np.degrees(geos[0, :]), geos[2, :])
return wgs84.latlon(np.degrees(geos[1]), np.degrees(geos[0]), geos[2])
def compute_limb(
orbit_track: Geocentric,
number_points: int = 16,
elevation: float = 0,
) -> Union[Geometry, List[Geometry]]:
"""
Compute the instanteous limb.
Args:
orbit_track (skyfield.positionlib.Geocentric): The satellite position/velocity.
number_points (int): The required number of polygon points to generate.
elevation (float): The elevation (meters) at which project the limb.
Returns:
Union[shapely.Geometry, List[shapely.Geometry]: The limb(s).
"""
position, _ = orbit_track.frame_xyz_and_velocity(itrs)
polygons = [None] * np.size(position.m, axis=1) if position.m.ndim > 1 else None
for i in range(len(polygons)) if position.m.ndim > 1 else [-1]:
_position = position.m[:, i].copy() if i >= 0 else position.m
limb = edlimb(
constants.EARTH_EQUATORIAL_RADIUS + elevation,
constants.EARTH_EQUATORIAL_RADIUS + elevation,
constants.EARTH_POLAR_RADIUS + elevation,
_position,
)
polygon = project_polygon_to_elevation(
split_polygon(
Polygon(
[
Point(np.degrees(g[0]), np.degrees(g[1]))
for p in [
limb.center
+ np.cos(i) * limb.semi_major
+ np.sin(i) * limb.semi_minor
for i in np.linspace(0, np.pi * 2, number_points)
]
for g in [
recgeo(
p,
constants.EARTH_EQUATORIAL_RADIUS,
constants.EARTH_FLATTENING,
)
]
]
)
),
elevation,
)
if i >= 0:
polygons[i] = polygon
else:
polygons = polygon
return polygons
def buffer_target(
geometry: Geometry,
altitude: float,
inclination: float,
field_of_regard: float,
time_step: float,
distance_crs: str = "EPSG:4087",
distance_scaling: float = 1.0,
):
"""
Buffers a target geometry to support culling operations. Selects a buffer distance
equal to the distance traveled in one time step plus half of the field of regard
swath width. Simplifies geometries to distance tolerances within 5% of the buffer distance.
Args:
geometry (shapely.Geometry): The target geometry (with EPSG:4326 coordinates) to buffer.
altitude (float): The spacecraft orbit altitude (meters).
inclination (float): The spacecraft orbit inclination (degrees).
field_of_regard (float): The spacecraft instrument field of regard (degrees).
time_step (float): The simulation time step (seconds).
distance_crs (str): The coordinate reference system in which to perform
distance calculations (default: EPSG:4087).
distance_scaling (float): A multiplicative scaling factor to adjust the buffer
distance (default: 1.0).
Returns:
Union[shapely.geometry.Polygon, shapely.geometry.MultiPolygon]: The buffered geometry.
"""
to_crs = Transformer.from_crs("EPSG:4326", distance_crs, always_xy=True)
from_crs = Transformer.from_crs(distance_crs, "EPSG:4326", always_xy=True)
swath_width = field_of_regard_to_swath_width(altitude, field_of_regard)
ground_distance = compute_ground_velocity(altitude, inclination) * time_step
distance = (ground_distance + swath_width / 2) * distance_scaling
return split_polygon(
transform(
from_crs.transform, transform(to_crs.transform, geometry).buffer(distance)
)
)
[docs]
def project_polygon_to_elevation(
polygon: Union[Polygon, MultiPolygon], elevation: float
) -> Union[Polygon, MultiPolygon]:
"""
Projects a polygon to a specified elevation (z-coordinate).
Args:
polygon (Polygon or MultiPolygon): The polygon to project.
elevation (float): The elevation (meters) above the WGS 84 geoid.
Returns:
Polygon or MultiPolygon: The projected polygon.
"""
if isinstance(polygon, Polygon):
return Polygon(
[(p[0], p[1], elevation) for p in polygon.exterior.coords],
[[(p[0], p[1], elevation) for p in i.coords] for i in polygon.interiors],
)
return MultiPolygon(
[project_polygon_to_elevation(g, elevation) for g in polygon.geoms]
)
def _wrap_polygon_over_north_pole(
polygon: Union[Polygon, MultiPolygon],
) -> Union[Polygon, MultiPolygon]:
"""
Wraps polygon coordinates over the North pole. Due to buffering and projection,
sometimes latitudes exceed 90 degrees. This method wraps them to the correct
latitude between -90 and 90 degrees and adjusts the longitude by 180 degrees.
This method requires a polygon above 90 degrees latitude to be only on one
side of the prime meridian.
Note: this method only changes coordinates: it does not create a MultiPolygon.
Args:
polygon (Polygon or MultiPolygon): The polygon to wrap.
Returns:
Polygon, or MultiPolygon: The wrapped polygon.
"""
if isinstance(polygon, Polygon):
if all(c[1] <= 90 for c in polygon.exterior.coords):
# no wrapping necessary
return polygon
# map latitudes from [90, 180) to [90, -90), adjusting longitude by 180 degrees
lat_shift = 180 if all(c[0] <= 0 for c in polygon.exterior.coords) else -180
pgon = Polygon(
[
[
c[0] + lat_shift if c[1] >= 90 else c[0],
180 - c[1] if c[1] >= 90 else c[1],
]
for c in polygon.exterior.coords
],
[
[
[
c[0] + lat_shift if c[1] >= 90 else c[0],
180 - c[1] if c[1] >= 90 else c[1],
]
for c in i.coords
]
for i in polygon.interiors
],
)
# give up and return original polygon if invalid
if not pgon.is_valid:
return polygon
return pgon
if isinstance(polygon, MultiPolygon):
# recursive call for each polygon
polygons = [_wrap_polygon_over_north_pole(p) for p in polygon.geoms]
return MultiPolygon(
[
g
for p in polygons
for g in (p.geoms if isinstance(p, MultiPolygon) else [p])
]
)
raise ValueError("Unknown geometry: " + str(type(polygon)))
def _split_polygon_north_pole(
polygon: Union[Polygon, MultiPolygon],
) -> Union[Polygon, MultiPolygon]:
"""
Splits a Polygon into a MultiPolygon if it crosses north pole.
Args:
polygon (Polygon or MultiPolygon): The polygon to split.
Returns:
Polygon, or MultiPolygon: The split polygon.
"""
if isinstance(polygon, Polygon):
if all(c[1] <= 90 for c in polygon.exterior.coords):
# no splitting necessary
return polygon
# split polygon along north pole
parts = split(polygon, LineString([(-360, 90), (360, 90)]))
# check and split part over prime meridian if necessary
for part in parts.geoms:
if part.crosses(LineString([(0, 90), (0, 180)])):
parts = GeometryCollection(
[g for g in parts.geoms if g != part]
+ [g for g in split(part, LineString([(0, 90), (0, 180)])).geoms]
)
# convert to a multi polygon
if isinstance(parts, GeometryCollection):
parts = _convert_collection_to_polygon(parts)
# return polygon with components wrapped over north pole
return _wrap_polygon_over_north_pole(parts)
if isinstance(polygon, MultiPolygon):
# recursive call for each polygon
pgons = [_split_polygon_north_pole(p) for p in polygon.geoms]
return MultiPolygon(
[
g
for p in pgons
for g in (p.geoms if isinstance(p, MultiPolygon) else [p])
]
)
raise ValueError("Unknown geometry: " + str(type(polygon)))
def _wrap_polygon_over_south_pole(
polygon: Union[Polygon, MultiPolygon],
) -> Union[Polygon, MultiPolygon]:
"""
Wraps polygon coordinates over the South pole. Due to buffering and projection,
sometimes latitudes exceed -90 degrees. This method wraps them to the correct
latitude between -90 and 90 degrees and adjusts the longitude by 180 degrees.
This method requires a polygon above 90 degrees latitude to be only on one
side of the prime meridian.
Note: this method only changes coordinates: it does not create a MultiPolygon.
Args:
polygon (Polygon or MultiPolygon): The polygon to wrap.
Returns:
Polygon, or MultiPolygon: The wrapped polygon.
"""
if isinstance(polygon, Polygon):
if all(c[1] >= -90 for c in polygon.exterior.coords):
# no splitting necessary
return polygon
# map latitudes from [-90, -180) to [-90, 90), adjusting longitude by 180 degrees
lat_shift = 180 if all(c[0] <= 0 for c in polygon.exterior.coords) else -180
pgon = Polygon(
[
[
c[0] + lat_shift if c[1] <= -90 else c[0],
-180 - c[1] if c[1] <= -90 else c[1],
]
for c in polygon.exterior.coords
],
[
[
[
(c[0] + lat_shift if c[1] <= -90 else c[0],),
-180 - c[1] if c[1] <= -90 else c[1],
]
for c in i.coords
]
for i in polygon.interiors
],
)
# give up and return original polygon if invalid
if not pgon.is_valid:
return polygon
return pgon
if isinstance(polygon, MultiPolygon):
# recursive call for each polygon
polygons = [_wrap_polygon_over_south_pole(p) for p in polygon.geoms]
return MultiPolygon(
[
g
for p in polygons
for g in (p.geoms if isinstance(p, MultiPolygon) else [p])
]
)
raise ValueError("Unknown geometry: " + str(type(polygon)))
def _split_polygon_south_pole(
polygon: Union[Polygon, MultiPolygon],
) -> Union[Polygon, MultiPolygon]:
"""
Splits a Polygon into a MultiPolygon if it crosses south pole.
Args:
polygon (Polygon or MultiPolygon): The polygon to split.
Returns:
Polygon, or MultiPolygon: The split polygon.
"""
if isinstance(polygon, Polygon):
lat = np.array([c[1] for c in polygon.exterior.coords])
if np.all(lat >= -90):
return polygon
# split polygon along south pole
parts = split(polygon, LineString([(-360, -90), (360, -90)]))
# check and split part over prime meridian if necessary
for part in parts.geoms:
if part.crosses(LineString([(0, -90), (0, -180)])):
parts = GeometryCollection(
[g for g in parts.geoms if g != part]
+ [g for g in split(part, LineString([(0, -90), (0, -180)])).geoms]
)
# convert to a multi polygon
if isinstance(parts, GeometryCollection):
parts = _convert_collection_to_polygon(parts)
# return polygon with components wrapped over south pole
return _wrap_polygon_over_south_pole(parts)
if isinstance(polygon, MultiPolygon):
# recursive call for each polygon
pgons = [_split_polygon_south_pole(p) for p in polygon.geoms]
return MultiPolygon(
[
g
for p in pgons
for g in (p.geoms if isinstance(p, MultiPolygon) else [p])
]
)
raise ValueError("Unknown geometry: " + str(type(polygon)))
def _wrap_polygon_over_antimeridian(
polygon: Union[Polygon, MultiPolygon],
) -> Union[Polygon, MultiPolygon]:
"""
Wraps polygon coordinates over the antimeridian. Due to buffering and projection,
sometimes longitudes exceed 180 degrees. This method wraps them to
the correct longitude between -180 and 180 degrees.
Note: this method only changes coordinates: it does not create a MultiPolygon.
Args:
polygon (Polygon or MultiPolygon): The polygon to wrap.
Returns:
Polygon, or MultiPolygon: The wrapped polygon.
"""
if isinstance(polygon, Polygon):
if all(c[0] >= -180 and c[0] <= 180 for c in polygon.exterior.coords):
# no wrapping necessary
return polygon
if all(c[0] <= -180 for c in polygon.exterior.coords):
# map longitudes from (-540, -180] to (-180, 180]
pgon = Polygon(
[[c[0] + 360, c[1]] for c in polygon.exterior.coords],
[[[c[0] + 360, c[1]] for c in i.coords] for i in polygon.interiors],
)
if all(c[0] >= 180 for c in polygon.exterior.coords):
# map longitudes from [180, 540) to [-180, 180)
pgon = Polygon(
[[c[0] - 360, c[1]] for c in polygon.exterior.coords],
[[[c[0] - 360, c[1]] for c in i.coords] for i in polygon.interiors],
)
# give up and return original polygon if invalid
if not pgon.is_valid:
return polygon
return pgon
if isinstance(polygon, MultiPolygon):
# recursive call for each polygon
pgons = [_wrap_polygon_over_antimeridian(p) for p in polygon.geoms]
return MultiPolygon(
[
g
for p in pgons
for g in (p.geoms if isinstance(p, MultiPolygon) else [p])
]
)
raise ValueError("Unknown geometry: " + str(type(polygon)))
def _convert_collection_to_polygon(
collection: GeometryCollection,
) -> Union[Polygon, MultiPolygon]:
"""
Converts a GeometryCollection to a Polygon or MultiPolygon. Quick clipping
can create dirty results with points or lines on boundaries. This method
drops and lines or points from a GeometryCollection to return only the
Polygon or MultiPolygon geometry.
Args:
polygon (Polygon or MultiPolygon): The polygon to convert.
Returns:
Polygon, or MultiPolygon: The converted polygon.
"""
pgons = [p for p in collection.geoms if isinstance(p, Polygon)] + [
p for g in collection.geoms if isinstance(g, MultiPolygon) for p in g.geoms
]
if len(pgons) == 1:
return pgons[0]
return MultiPolygon(pgons)
def _split_polygon_antimeridian(
polygon: Union[Polygon, MultiPolygon],
) -> Union[Polygon, MultiPolygon]:
"""
Splits a Polygon into a MultiPolygon if it crosses the anti-meridian after
wrapping its coordinates using `wrap_coordinates_antimeridian`. Note: this
function only supports polygons that span LESS than 360 degrees longitude.
Args:
polygon (Polygon or MultiPolygon): The polygon to split.
Returns:
Polygon, or MultiPolygon: The split polygon.
"""
if isinstance(polygon, Polygon):
lon = np.array([c[0] for c in polygon.exterior.coords])
# check if any longitudes cross the anti-meridian
# (adjacent coordinate longitude differs by more than 180 degrees)
if all(np.abs(np.diff(lon)) < 180):
return polygon
# check if this polygon contains a pole
if Polygon(zip(np.cos(np.radians(lon)), np.sin(np.radians(lon)))).contains(
Point(0, 0)
):
# extract and sort coords by longitude
coords = polygon.exterior.coords[0:-1]
coords.sort(key=lambda r: r[0])
# determine if contains north or south pole based on sign of mean latitude
n_s = 1 if np.array(coords)[:, 1].mean() > 0 else -1
# interpolate latitude at antimeridian
lat = np.interp(
180, [coords[-1][0], coords[0][0] + 180], [coords[-1][1], coords[0][1]]
)
# reconstruct polygon (ccw) with added coords on antimeridian
# TODO potential problem if provided polygon has z-dimension
pgon = Polygon(
[(-180, 90 * n_s), (-180, lat)]
+ coords
+ [(180, lat), (180, 90 * n_s), (-180, 90 * n_s)],
polygon.interiors,
)
# return polygon split down prime meridian to improve handling
parts = split(pgon, LineString([(0, -180), (0, 180)]))
# convert to multi polygon
if isinstance(parts, GeometryCollection):
parts = _convert_collection_to_polygon(parts)
return parts
# find anti-meridian crossings and calculate shift direction
# coords from W -> E (shift < 0) will add 360 degrees to E component
# coords from E -> W (shift > 0) will subtract 360 degrees from W component
shift = np.insert(np.cumsum(np.around(np.diff(lon) / 360)), 0, 0)
pgon = Polygon(
[
(c[0] - 360 * shift[i], c[1])
for i, c in enumerate(polygon.exterior.coords)
],
[
[
(
ic[0]
- 360 * np.interp(ic[0], np.sort(lon), shift[np.argsort(lon)]),
ic[1],
)
for ic in i.coords
]
for i in polygon.interiors
],
)
# split along the anti-meridian (-180 for shift > 0; 180 for shift < 0)
shift_dir = -180 if shift.max() >= 1 else 180
parts = split(pgon, LineString([(shift_dir, -180), (shift_dir, 180)]))
# convert to multi polygon
if isinstance(parts, GeometryCollection):
parts = _convert_collection_to_polygon(parts)
# return polygon with components wrapped over anti-meridian
return _wrap_polygon_over_antimeridian(parts)
if isinstance(polygon, MultiPolygon):
# recursive call for each polygon
pgons = [_split_polygon_antimeridian(p) for p in polygon.geoms]
return MultiPolygon(
[
g
for p in pgons
for g in (p.geoms if isinstance(p, MultiPolygon) else [p])
]
)
raise ValueError("Unknown geometry: " + str(type(polygon)))
[docs]
def split_polygon(
polygon: Union[Polygon, MultiPolygon],
) -> Union[Polygon, MultiPolygon]:
"""
Splits a Polygon into a MultiPolygon if it crosses the anti-meridian
(180 degrees longitude), exceeds the north pole (90 degrees latitude), or
exceeds the south pole (-90 degrees latitude). Note: this function
only supports polygons that span LESS than 360 degrees longitude.
Args:
polygon (Polygon or MultiPolygon): The polygon to split.
Returns:
Polygon, or MultiPolygon: The split polygon.
"""
polygon = _split_polygon_north_pole(
_split_polygon_south_pole(_split_polygon_antimeridian(polygon))
)
# invalid polygons can arise from narrow sensor geometries in polar regions
if not polygon.is_valid:
# try to fix geometry
polygon = make_valid(polygon)
if isinstance(polygon, GeometryCollection):
polygon = _convert_collection_to_polygon(polygon)
return polygon
[docs]
def normalize_geometry(
geometry: Union[Polygon, MultiPolygon, gpd.GeoDataFrame],
) -> gpd.GeoDataFrame:
"""
Normalize geometry to a GeoDataFrame with antimeridian wrapping.
Args:
geometry (geopandas.GeoDataFrame, geopandas.GeoSeries, Polygon, or MultiPolygon): The geometry to normalize.
Returns:
geopandas.GeoDataFrame: The normalized geometry.
"""
if isinstance(geometry, (Polygon, MultiPolygon)):
if not geometry.is_valid:
raise ValueError("Geometry is not a valid Polygon or MultiPolygon.")
geometry = gpd.GeoDataFrame(geometry=gpd.GeoSeries([geometry]), crs="EPSG:4326")
elif isinstance(geometry, gpd.GeoSeries):
geometry = gpd.GeoDataFrame(geometry=geometry, crs="EPSG:4326")
if isinstance(geometry, gpd.GeoDataFrame):
geometry["geometry"] = geometry.apply(
lambda r: split_polygon(r.geometry),
axis=1,
)
return geometry
def zero_pad(object_name: str, max_number: int, current_number: int) -> str:
"""
Uses length of max_number to zero pad allowing for alphanumeric sorting.
Args:
object_name (str): Object name, to be concatenated with zero padded number.
max_number (int): Maximum number, utilized as reference for zero padding.
current_number (int): Index number to be zero padded.
Returns:
str: The object name with zero padded number appended.
"""
max_length = len(str(max_number))
return object_name + " " + str(current_number).zfill(max_length)
def is_even_length_list(v: List) -> List:
"""
Validator to check for even-length lists.
"""
if len(v) % 2 == 1:
raise ValueError(f"{v} does not have even length")
return v
def is_valid_tle(v: List[str]) -> List[str]:
"""
Validate the two line element set.
"""
for i in range(0, len(v), 2):
# based on orekit's TLE.isFormatOK function
if len(v[i]) != 69:
raise ValueError(f"Invalid tle: line {i+1} incorrect length.")
if len(v[i + 1]) != 69:
raise ValueError(f"Invalid tle: line {i+2} incorrect length.")
line_1_pattern = (
r"1 [ 0-9A-HJ-NP-Z][ 0-9]{4}[A-Z] [ 0-9]{5}[ A-Z]{3} "
+ r"[ 0-9]{5}[.][ 0-9]{8} (?:(?:[ 0+-][.][ 0-9]{8})|(?: "
+ r"[ +-][.][ 0-9]{7})) [ +-][ 0-9]{5}[+-][ 0-9] "
+ r"[ +-][ 0-9]{5}[+-][ 0-9] [ 0-9] [ 0-9]{4}[ 0-9]"
)
if re.match(line_1_pattern, v[i]) is None:
raise ValueError(f"Invalid tle: line {i+1} does not match pattern.")
line_2_pattern = (
r"2 [ 0-9A-HJ-NP-Z][ 0-9]{4} [ 0-9]{3}[.][ 0-9]{4} "
+ r"[ 0-9]{3}[.][ 0-9]{4} [ 0-9]{7} [ 0-9]{3}[.][ 0-9]{4} "
+ r"[ 0-9]{3}[.][ 0-9]{4} [ 0-9]{2}[.][ 0-9]{13}[ 0-9]"
)
if re.match(line_2_pattern, v[i + 1]) is None:
raise ValueError(f"Invalid tle: line {i+2} does not match pattern.")
def checksum(line):
the_sum = 0
for j in range(68):
if line[j].isdigit():
the_sum += int(line[j])
elif line[j] == "-":
the_sum += 1
return the_sum % 10
if int(v[i][68]) != checksum(v[i]):
raise ValueError(f"Invalid tle: line {i+1} checksum failed.")
if int(v[i + 1][68]) != checksum(v[i + 1]):
raise ValueError(f"Invalid tle: line {1+2} checksum failed.")
return v
def is_chronological_tle(v: List[str]) -> List[str]:
"""
Validator to check for chronological TLEs.
"""
epochs = np.array(
[
sat_epoch_datetime(Satrec.twoline2rv(v[i], v[i + 1]))
for i in range(0, len(v), 2)
]
)
if not all(epochs[:-1] <= epochs[1:]):
raise ValueError(f"Invalid multi-tle: not in chronological order.")
return v