Source code for tatc.analysis.track

# -*- coding: utf-8 -*-
"""
Methods to generate coverage statistics.

@author: Paul T. Grogan <paul.grogan@asu.edu>
"""

from typing import List, Union, Optional
from datetime import datetime, timedelta
from enum import Enum

import pandas as pd
import numpy as np
import geopandas as gpd
from skyfield.api import wgs84
from skyfield.framelib import itrs
from skyfield.functions import angle_between

from shapely.geometry import (
    Polygon,
    MultiPolygon,
    Point,
    LineString,
)
from shapely.ops import clip_by_rect, split
import pyproj

from ..schemas.instrument import PointedInstrument
from ..schemas.satellite import Satellite
from ..utils import (
    buffer_footprint,
    buffer_target,
    field_of_regard_to_swath_width,
)
from ..constants import de421, EARTH_MEAN_RADIUS, timescale


def _get_empty_orbit_track() -> gpd.GeoDataFrame:
    """
    Gets an empty data frame for orbit track results.

    Returns:
        geopandas.GeoDataFrame: Empty data frame.
    """
    columns = {
        "time": pd.Series([], dtype="datetime64[ns, utc]"),
        "satellite": pd.Series([], dtype="str"),
        "instrument": pd.Series([], dtype="str"),
        "swath_width": pd.Series([], dtype="float"),
        "valid_obs": pd.Series([], dtype="bool"),
        "geometry": pd.Series([], dtype="object"),
    }
    return gpd.GeoDataFrame(columns, crs="EPSG:4326")


class OrbitCoordinate(str, Enum):
    """
    Enumeration of different orbit track coordinate systems.
    """

    WGS84 = "wgs84"
    ECEF = "ecef"
    ECI = "eci"


class OrbitOutput(str, Enum):
    """
    Enumeration of different orbit output options.
    """

    POSITION = "position"
    POSITION_VELOCITY = "velocity"


[docs] def collect_orbit_track( satellite: Satellite, times: List[datetime], instrument_index: int = 0, elevation: float = 0, mask: Optional[ Union[ Polygon, MultiPolygon, gpd.GeoDataFrame, gpd.GeoSeries, ] ] = None, coordinates: OrbitCoordinate = OrbitCoordinate.WGS84, orbit_output: OrbitOutput = OrbitOutput.POSITION, sat_sunlit: bool = False, solar_altaz: bool = False, solar_beta: bool = False, ) -> gpd.GeoDataFrame: """ Collect orbit track points for a satellite of interest. Args: satellite (Satellite): The observing satellite. times (typing.List[datetime.datetime]): The list of times to sample. instrument_index (int): The index of the observing instrument in satellite. elevation (float): The elevation (meters) above the datum in the WGS 84 coordinate system for which to calculate swath width. mask (Polygon, MultiPolygon, geopandas.GeoDataFrame, geopandas.GeoSeries): An optional mask to constrain results. coordinates (OrbitCoordinate): The coordinate system of orbit track points. orbit_output (OrbitOutput): The output option. sat_sunlit (bool): `True` to include whether the satellite is sunlit. solar_altaz (bool): `True` to include solar altitude/azimuth angles. solar_beta (bool): `True` to include solar beta angles. Returns: geopandas.GeoDataFrame: The data frame of collected orbit track results. """ if len(times) == 0: return _get_empty_orbit_track() # select the observing instrument instrument = satellite.instruments[instrument_index] # propagate orbit orbit_track = satellite.orbit.to_tle().get_orbit_track(times) ssp = wgs84.geographic_position_of(orbit_track) if mask is not None: # trim orbit track to provided mask mask_contains_ssp = [ ( any(mask.contains(Point(longitude, latitude))) if isinstance(mask, (gpd.GeoDataFrame, gpd.GeoSeries)) else mask.contains(Point(longitude, latitude)) ) for (longitude, latitude) in zip( ssp.longitude.degrees, ssp.latitude.degrees ) ] if not any(mask_contains_ssp): return _get_empty_orbit_track() orbit_track = orbit_track[mask_contains_ssp] # recompute ssp ssp = wgs84.geographic_position_of(orbit_track) # create shapely points in proper coordinate system if coordinates == OrbitCoordinate.WGS84: points = [ Point(longitude, latitude, elevation) for (longitude, latitude, elevation) in zip( ssp.longitude.degrees, ssp.latitude.degrees, ssp.elevation.m, ) ] elif coordinates == OrbitCoordinate.ECEF: points = [ Point(position[0], position[1], position[2]) for position in ssp.itrs_xyz.m.T ] else: points = [ Point(position[0], position[1], position[2]) for position in orbit_track.xyz.m.T ] # determine observation validity valid_obs = instrument.is_valid_observation(orbit_track) if len(orbit_track.t) == 1: # transform scalar to vector results valid_obs = np.array([valid_obs]) # create velocity points if needed if orbit_output == OrbitOutput.POSITION: records = [ { "time": time, "satellite": satellite.name, "instrument": instrument.name, "swath_width": field_of_regard_to_swath_width( ssp.elevation.m[i], instrument.field_of_regard, elevation, ), "valid_obs": valid_obs[i], "geometry": points[i], } for i, time in enumerate(orbit_track.t.utc_datetime()) ] else: # compute satellite velocity if coordinates == OrbitCoordinate.ECI: velocities = [ Point(velocity[0], velocity[1], velocity[2]) for velocity in orbit_track.velocity.m_per_s.T ] else: velocities = [ Point(velocity[0], velocity[1], velocity[2]) for velocity in orbit_track.frame_xyz_and_velocity(itrs)[1].m_per_s.T ] records = [ { "time": time, "satellite": satellite.name, "instrument": instrument.name, "swath_width": field_of_regard_to_swath_width( ssp.elevation.m[i], instrument.field_of_regard, elevation, ), "valid_obs": valid_obs[i], "geometry": points[i], "velocity": velocities[i], } for i, time in enumerate(orbit_track.t.utc_datetime()) ] track = gpd.GeoDataFrame(records, crs="EPSG:4326") if sat_sunlit: # append sat_sunlit column track["sat_sunlit"] = orbit_track.is_sunlit(de421) if solar_altaz: # append solar altitude/azimuth columns solar_altaz = ( (de421["earth"] + wgs84.geographic_position_of(orbit_track)) .at(orbit_track.t) .observe(de421["sun"]) .apparent() .altaz() ) track["solar_alt"] = solar_altaz[0].degrees track["solar_az"] = solar_altaz[1].degrees if solar_beta: # append solar beta column # based on https://github.com/skyfielders/python-skyfield/issues/1054 plane_normal = np.cross( orbit_track.position.m, orbit_track.velocity.m_per_s, axis=0 ) sun = de421["earth"].at(orbit_track.t).observe(de421["sun"]).position.m beta = np.pi / 2 - angle_between(plane_normal, sun) track["solar_beta"] = np.degrees(beta) if mask is not None: track = gpd.clip(track, mask).reset_index(drop=True) return track
def _get_empty_ground_track() -> gpd.GeoDataFrame: """ Gets an empty data frame for ground track results. Returns: geopandas.GeoDataFrame: Empty data frame. """ columns = { "time": pd.Series([], dtype="datetime64[ns, utc]"), "satellite": pd.Series([], dtype="str"), "instrument": pd.Series([], dtype="str"), "valid_obs": pd.Series([], dtype="bool"), "geometry": pd.Series([], dtype="object"), } return gpd.GeoDataFrame(columns, crs="EPSG:4326") def _get_utm_epsg_code(point: Point, swath_width: float) -> str: """ Get the Universal Transverse Mercator (UTM) EPSG code for a ground track. Args: point (Point): the geodetic sub-satellite point swath_width (float): the observation swath width (meters) Returns: str: the EPSG code """ # approximate footprint polygon = point.buffer(np.degrees(swath_width / 2 / EARTH_MEAN_RADIUS)) results = pyproj.database.query_utm_crs_info( datum_name="WGS 84", area_of_interest=pyproj.aoi.AreaOfInterest( *clip_by_rect(polygon, -180, -90, 180, 90).bounds ), ) # return 5041 for UPS North; 5042 for UPS South; UTM zone, or 4087 for default return ( "EPSG:5041" if polygon.bounds[3] > 84 else ( "EPSG:5042" if polygon.bounds[1] < -80 else "EPSG:" + results[0].code if len(results) > 0 else "EPSG:4087" ) )
[docs] def collect_ground_track( satellite: Satellite, times: List[datetime], instrument_index: int = 0, elevation: float = 0, mask: Optional[ Union[ Polygon, MultiPolygon, gpd.GeoDataFrame, gpd.GeoSeries, ] ] = None, crs: str = "EPSG:4087", sat_altaz: bool = False, solar_altaz: bool = False, ) -> gpd.GeoDataFrame: """ Collect ground track polygons for a satellite of interest. Args: satellite (Satellite): The observing satellite. instrument_index (int): The index of the observing instrument in satellite. times (typing.List[datetime.datetime]): The list of datetimes to sample. elevation (float): The elevation (meters) above the datum in the WGS 84 coordinate system for which to calculate ground track. mask (Polygon, MultiPolygon, geopandas.GeoDataFrame, geopandas.GeoSeries): An optional mask to constrain results. crs (str): The coordinate reference system (CRS) in which to compute distance (default: World Equidistant Cylindrical `"EPSG:4087"`). Selecting `crs="utm"` uses Universal Transverse Mercator (UTM) zones for non-polar regions, and Universal Polar Stereographic (UPS) systems for polar regions. Selecting `crs="spice"` uses SPICE to compute observation footprints. sat_altaz (bool): `True` to include satellite altitude/azimuth angles. solar_altaz (bool): `True` to include solar altitude/azimuth angles. Returns: geopandas.GeoDataFrame: The data frame of collected ground track results. """ if len(times) == 0: return _get_empty_ground_track() # propagate orbit orbit_track = satellite.orbit.to_tle().get_orbit_track(times) # select the observing instrument instrument = satellite.instruments[instrument_index] if mask is not None and len(times) > 1: ssp = wgs84.geographic_position_of(orbit_track) buffered_mask = buffer_target( geometry=( mask if isinstance(mask, (Polygon, MultiPolygon)) else ( mask.dissolve().iloc[0].geometry if isinstance(mask, gpd.GeoDataFrame) else mask.iloc[0] ) ), altitude=satellite.orbit.to_tle().get_altitude(), inclination=satellite.orbit.to_tle().get_inclination(), field_of_regard=instrument.field_of_regard, time_step=np.diff(times).mean() / timedelta(seconds=1), ) # cull orbit track with buffered mask buffered_mask_contains_ssp = [ ( any(buffered_mask.contains(Point(longitude, latitude))) if isinstance(buffered_mask, (gpd.GeoDataFrame, gpd.GeoSeries)) else buffered_mask.contains(Point(longitude, latitude)) ) for (longitude, latitude) in zip( ssp.longitude.degrees, ssp.latitude.degrees ) ] if not any(buffered_mask_contains_ssp): return _get_empty_ground_track() orbit_track = orbit_track[buffered_mask_contains_ssp] # compute footprint for culling footprint = instrument.compute_footprint(orbit_track, elevation=elevation) # cull orbit track to observable footprint mask_intersects_footprint = [ ( any(mask.intersects(f)) if isinstance(mask, (gpd.GeoDataFrame, gpd.GeoSeries)) else mask.intersects(f) ) for f in (footprint if isinstance(footprint, list) else [footprint]) ] if not any(mask_intersects_footprint): return _get_empty_ground_track() orbit_track = orbit_track[mask_intersects_footprint] # compute targets target = instrument.compute_footprint_center(orbit_track, elevation) # determine observation validity valid_obs = instrument.is_valid_observation(orbit_track, target) if len(orbit_track.t) == 1: # transform scalar to vector results valid_obs = np.array([valid_obs]) if crs == "spice": geometries = instrument.compute_footprint( orbit_track, None, elevation, ) if len(orbit_track.t) == 1: geometries = [geometries] else: # compute the orbit track of the satellite gdf = collect_orbit_track( satellite, orbit_track.t.utc_datetime(), instrument_index, elevation ) if crs == "utm": gdf["utm_crs"] = gdf.apply( lambda r: _get_utm_epsg_code(r.geometry, r.swath_width), axis=1 ) # preload transformers to_crs = {} from_crs = {} for code in gdf.utm_crs.unique(): to_crs[code] = pyproj.Transformer.from_crs( gdf.crs, pyproj.CRS(code), always_xy=True ) from_crs[code] = pyproj.Transformer.from_crs( pyproj.CRS(code), gdf.crs, always_xy=True ) geometries = gdf.apply( lambda r: buffer_footprint( r.geometry, to_crs[r.utm_crs], from_crs[r.utm_crs], r.swath_width, elevation, ), axis=1, ).values else: to_crs = pyproj.Transformer.from_crs( gdf.crs, pyproj.CRS(crs), always_xy=True ) from_crs = pyproj.Transformer.from_crs( pyproj.CRS(crs), gdf.crs, always_xy=True ) # construct polygons based on visible extent of instrument # project to specified elevation geometries = gdf.apply( lambda r: buffer_footprint( r.geometry, to_crs, from_crs, r.swath_width, elevation ), axis=1, ).values records = [ { "time": time, "satellite": satellite.name, "instrument": instrument.name, "valid_obs": valid_obs[i], "geometry": geometries[i], } for i, time in enumerate(orbit_track.t.utc_datetime()) ] track = gpd.GeoDataFrame(records, crs="EPSG:4326") if sat_altaz: # append satellite altitude/azimuth columns sat_altaz = (orbit_track - target.at(orbit_track.t)).altaz() track["sat_alt"] = sat_altaz[0].degrees track["sat_az"] = sat_altaz[1].degrees if solar_altaz: # append solar altitude/azimuth columns solar_altaz = ( (de421["earth"] + target) .at(orbit_track.t) .observe(de421["sun"]) .apparent() .altaz() ) track["solar_alt"] = solar_altaz[0].degrees track["solar_az"] = solar_altaz[1].degrees if mask is not None: track = gpd.clip(track, mask).reset_index(drop=True) return track
[docs] def compute_ground_track( satellite: Satellite, times: List[datetime], instrument_index: int = 0, elevation: float = 0, mask: Optional[ Union[ Polygon, MultiPolygon, gpd.GeoDataFrame, gpd.GeoSeries, ] ] = None, crs: str = "EPSG:4087", method: str = "point", dissolve_orbits: bool = True, ) -> gpd.GeoDataFrame: """ Compute the aggregated ground track for a satellite of interest. Args: satellite (Satellite): The observing satellite. instrument_index (int): The index of the observing instrument in satellite. times (typing.List[datetime.datetime]): The list of datetimes to sample. elevation (float): The elevation (meters) above the datum in the WGS 84 coordinate system for which to calculate ground track. mask (Polygon, MultiPolygon, geopandas.GeoDataFrame, geopandas.GeoSeries): An optional mask to constrain results. crs (str): The coordinate reference system (CRS) in which to compute distance (default: World Equidistant Cylindrical `"EPSG:4087"`). Selecting `crs="utm"` uses Universal Transverse Mercator (UTM) zones for non-polar regions, and Universal Polar Stereographic (UPS) systems for polar regions. Selecting `crs="spice"` uses SPICE to compute footprints. method (str): The method for computing ground track: `"point"` buffers individual points and `"line"` buffers a line of points. The line method is not compatible with the `crs="spice"` option above. dissolve_orbits (bool): True, to aggregate multiple orbits in one output. Returns: GeoDataFrame: The data frame of aggregated ground track results. """ if method not in ["point", "line"]: raise ValueError("Invalid method: " + str(method)) if method == "point": track = collect_ground_track( satellite, times, instrument_index, elevation, mask, crs ) if not track.empty: # assign orbit identifier track["orbit_id"] = [ (time - times[0]) // satellite.orbit.to_tle().get_orbit_period() for time in track.time ] # filter to valid observations and dissolve track = ( track[track.valid_obs].dissolve(by="orbit_id").reset_index(drop=True) ) if dissolve_orbits: track = track.dissolve() return track if method == "line": if crs == "spice": raise ValueError("The line method is not compatible with spice") track = collect_orbit_track(satellite, times, instrument_index, elevation, None) # assign orbit identifier track["orbit_id"] = [ (time - times[0]) // satellite.orbit.to_tle().get_orbit_period() for time in track.time ] # assign track identifiers to group contiguous observation periods track["track_id"] = ( (track.valid_obs != track.valid_obs.shift()).astype("int").cumsum() ) # filter to valid observations track = track[track.valid_obs].reset_index(drop=True) segments = [] swath_widths = [] for _, sub_track in track.groupby(["orbit_id", "track_id"]): # project points to specified elevation points = sub_track.geometry.apply(lambda p: Point(p.x, p.y, elevation)) # extract longitudes lon = sub_track.geometry.apply(lambda p: p.x) # extract average swath width swath_widths.append(sub_track.swath_width.mean()) # no anti-meridian crossings if all absolute longitude differences # are less than 180 deg if np.all(np.abs(np.diff(lon)) < 180): segments.append(LineString(points)) else: # find anti-meridian crossings and calculate shift direction # coords from W -> E (shift < 0) will add 360 degrees to E component # coords from E -> W (shift > 0) will subtract 360 degrees from W component shift = np.insert(np.cumsum(np.around(np.diff(lon) / 360)), 0, 0) points = [ Point(p.x - 360 * shift[i], p.y, p.z) for i, p in enumerate(points) ] # split along the anti-meridian (-180 for shift > 0; 180 for shift < 0) shift_dir = -180 if shift.max() >= 1 else 180 collection = split( LineString(points), LineString([(shift_dir, -180), (shift_dir, 180)]), ) # map longitudes from (-540, -180] to (-180, 180] and from [180, 540) to [-180, 180) segments.extend( [ ( LineString([(c[0] + 360, c[1], c[2]) for c in line.coords]) if np.all([c[0] <= -180 for c in line.coords]) else ( LineString( [(c[0] - 360, c[1], c[2]) for c in line.coords] ) if np.all([c[0] >= 180 for c in line.coords]) else LineString( [(c[0], c[1], c[2]) for c in line.coords] ) ) ) for line in collection.geoms ] ) to_crs = pyproj.Transformer.from_crs(track.crs, pyproj.CRS(crs), always_xy=True) from_crs = pyproj.Transformer.from_crs( pyproj.CRS(crs), track.crs, always_xy=True ) polygons = [ buffer_footprint(segment, to_crs, from_crs, swath_width, elevation) for segment, swath_width in zip(segments, swath_widths) ] # dissolve the original track track = track.dissolve(by=["orbit_id", "track_id"]).reset_index(drop=True) # and replace the geometry with the union of computed polygons track.geometry = polygons if mask is not None: track = gpd.clip(track, mask).reset_index(drop=True) if dissolve_orbits: track = track.dissolve() return track
def collect_ground_pixels( satellite: Satellite, times: List[datetime], instrument_index: int = 0, elevation: float = 0, mask: Optional[ Union[ Polygon, MultiPolygon, gpd.GeoDataFrame, gpd.GeoSeries, ] ] = None, sat_altaz: bool = False, solar_altaz: bool = False, ) -> gpd.GeoDataFrame: """ Collect ground pixels for a satellite of interest. Args: satellite (Satellite): The observing satellite. instrument_index (int): The index of the observing instrument in satellite. times (typing.List[datetime.datetime]): The list of datetimes to sample. elevation (float): The elevation (meters) above the datum in the WGS 84 coordinate system for which to calculate ground pixels. mask (Polygon, MultiPolygon, geopandas.GeoDataFrame, geopandas.GeoSeries): An optional mask to constrain results. sat_altaz (bool): `True` to include satellite altitude/azimuth angles. solar_altaz (bool): `True` to include solar altitude/azimuth angles. Returns: geopandas.GeoDataFrame: The data frame of collected ground pixels results. """ if len(times) == 0: return _get_empty_ground_track() # propagate orbit orbit_track = satellite.orbit.to_tle().get_orbit_track(times) # select the observing instrument instrument = satellite.instruments[instrument_index] if not isinstance(instrument, PointedInstrument) or not instrument.is_rectangular: raise ValueError( "Ground pixels are only compatible with rectangular PointedInstrument instances" ) if mask is not None and len(times) > 1: ssp = wgs84.geographic_position_of(orbit_track) buffered_mask = buffer_target( geometry=( mask if isinstance(mask, (Polygon, MultiPolygon)) else ( mask.dissolve().iloc[0].geometry if isinstance(mask, gpd.GeoDataFrame) else mask.iloc[0] ) ), altitude=satellite.orbit.to_tle().get_altitude(), inclination=satellite.orbit.to_tle().get_inclination(), field_of_regard=instrument.field_of_regard, time_step=np.diff(times).mean() / timedelta(seconds=1), ) # cull orbit track with buffered mask buffered_mask_contains_ssp = [ ( any(buffered_mask.contains(Point(longitude, latitude))) if isinstance(buffered_mask, (gpd.GeoDataFrame, gpd.GeoSeries)) else buffered_mask.contains(Point(longitude, latitude)) ) for (longitude, latitude) in zip( ssp.longitude.degrees, ssp.latitude.degrees ) ] if not any(buffered_mask_contains_ssp): return _get_empty_ground_track() orbit_track = orbit_track[buffered_mask_contains_ssp] # compute footprint for culling footprint = instrument.compute_footprint(orbit_track, elevation=elevation) # cull orbit track to observable footprint mask_intersects_footprint = [ ( any(mask.intersects(f)) if isinstance(mask, (gpd.GeoDataFrame, gpd.GeoSeries)) else mask.intersects(f) ) for f in (footprint if isinstance(footprint, list) else [footprint]) ] if not any(mask_intersects_footprint): return _get_empty_ground_track() orbit_track = orbit_track[mask_intersects_footprint] # compute the footprint pixel array geometries = instrument.compute_footprint_pixel_array( orbit_track, elevation, ) # construct results as a list of dictionaries records = [ { "time": time, "satellite": satellite.name, "instrument": instrument.name, "valid_obs": instrument.is_valid_observation( orbit_track[i], wgs84.latlon(point.y, point.x, point.z) ), "geometry": point, } for i, time in enumerate(orbit_track.t.utc_datetime()) for point in ( geometries[i].geoms if len(orbit_track.t) > 1 else geometries.geoms ) ] # build geodataframe gdf = gpd.GeoDataFrame(records, crs="EPSG:4326") if sat_altaz: # append satellite altitude/azimuth columns sat_altaz = [ ( orbit_track[list(orbit_track.t.utc_datetime()).index(record["time"])] - wgs84.latlon( record["geometry"].y, record["geometry"].x, record["geometry"].z ).at(timescale.from_datetime(record["time"])) ).altaz() for record in records ] gdf["sat_alt"] = list(map(lambda altaz: altaz[0].degrees, sat_altaz)) gdf["sat_az"] = list(map(lambda altaz: altaz[1].degrees, sat_altaz)) if solar_altaz: # append solar altitude/azimuth columns solar_altaz = [ ( de421["earth"] + wgs84.latlon( record["geometry"].y, record["geometry"].x, record["geometry"].z ) ) .at(timescale.from_datetime(record["time"])) .observe(de421["sun"]) .apparent() .altaz() for record in records ] gdf["solar_alt"] = list(map(lambda altaz: altaz[0].degrees, solar_altaz)) gdf["solar_az"] = list(map(lambda altaz: altaz[1].degrees, solar_altaz)) if mask is not None: gdf = gpd.clip(gdf, mask).reset_index(drop=True) return gdf