Source code for tatc.analysis.track

# -*- coding: utf-8 -*-
"""
Methods to generate coverage statistics.

@author: Paul T. Grogan <pgrogan@stevens.edu>
"""

from typing import List, Union, Optional
from datetime import datetime

import pandas as pd
import numpy as np
import geopandas as gpd
from skyfield.api import wgs84, EarthSatellite
from shapely.geometry import (
    Polygon,
    MultiPolygon,
    Point,
    LineString,
)
from shapely.ops import transform, unary_union
import pyproj

from ..schemas.satellite import Satellite
from ..schemas.instrument import Instrument
from ..utils import (
    split_polygon,
    field_of_regard_to_swath_width,
)
from ..constants import timescale


def _get_empty_orbit_track() -> gpd.GeoDataFrame:
    """
    Gets an empty data frame for orbit track results.

    Returns:
        geopandas.GeoDataFrame: Empty data frame.
    """
    columns = {
        "time": pd.Series([], dtype="datetime64[ns, utc]"),
        "satellite": pd.Series([], dtype="str"),
        "instrument": pd.Series([], dtype="str"),
        "swath_width": pd.Series([], dtype="float"),
        "valid_obs": pd.Series([], dtype="bool"),
        "geometry": pd.Series([], dtype="object"),
    }
    return gpd.GeoDataFrame(columns, crs="EPSG:4326")


[docs]def collect_orbit_track( satellite: Satellite, instrument: Instrument, times: List[datetime], elevation: float = 0, mask: Optional[Union[Polygon, MultiPolygon]] = None, ) -> gpd.GeoDataFrame: """ Collect orbit track points for a satellite of interest. Args: satellite (Satellite): The observing satellite. instrument (Instrument): The observing instrument. times (typing.List[datetime.datetime]): The list of times to sample. elevation (float): The elevation (meters) above the datum in the WGS 84 coordinate system for which to calculate swath width. mask (Polygon or MultiPolygon): An optional mask to constrain results. Returns: geopandas.GeoDataFrame: The data frame of collected orbit track results. """ if len(times) == 0: return _get_empty_orbit_track() # convert orbit to tle orbit = satellite.orbit.to_tle() # construct a satellite for propagation sat = EarthSatellite(orbit.tle[0], orbit.tle[1], satellite.name) # create skyfield time series ts_times = timescale.from_datetimes(times) # compute satellite positions positions = sat.at(ts_times) # project to geographic positions subpoints = [wgs84.geographic_position_of(position) for position in positions] # create shapely points points = [ Point( subpoint.longitude.degrees, subpoint.latitude.degrees, subpoint.elevation.m ) for subpoint in subpoints ] valid_obs = instrument.is_valid_observation(sat, ts_times) records = [ { "time": time, "satellite": satellite.name, "instrument": instrument.name, "swath_width": field_of_regard_to_swath_width( subpoints[i].elevation.m, instrument.field_of_regard, elevation, ), "valid_obs": valid_obs[i], "geometry": points[i], } for i, time in enumerate(times) ] gdf = gpd.GeoDataFrame(records, crs="EPSG:4326") if mask is None: return gdf return gpd.clip(gdf, mask).reset_index(drop=True)
def _get_utm_epsg_code(point: Point) -> str: """ Get the Universal Transverse Mercator (UTM) EPSG code for a geodetic point. Args: point (Point): the geodetic point Returns: str: the UTM EPSG code """ results = pyproj.database.query_utm_crs_info( datum_name="WGS 84", area_of_interest=pyproj.aoi.AreaOfInterest(point.x, point.y, point.x, point.y), ) # return the first code if exists; otherwise return a default value # 5041 is UPS North; 5042 is UPS South; 4087 is the default return ( "EPSG:" + results[0].code if len(results) > 0 else "EPSG:5041" if point.y > 84 else "EPSG:5042" if point.y < -80 else "EPSG:4087" )
[docs]def collect_ground_track( satellite: Satellite, instrument: Instrument, times: List[datetime], elevation: float = 0, mask: Optional[Union[Polygon, MultiPolygon]] = None, crs: str = "EPSG:4087", ) -> gpd.GeoDataFrame: """ Collect ground track polygons for a satellite of interest. Args: satellite (Satellite): The observing satellite. instrument (Instrument): The observing instrument. times (typing.List[datetime.datetime]): The list of datetimes to sample. elevation (float): The elevation (meters) above the datum in the WGS 84 coordinate system for which to calculate ground track. mask (Polygon or MultiPolygon): An optional mask to constrain results. crs (str): The coordinate reference system (CRS) in which to compute distance (default: World Equidistant Cylindrical `"EPSG:4087"`). Selecting `crs="utm"` uses Universal Transverse Mercator (UTM) zones for non-polar regions, and Universal Polar Stereographic (UPS) systems for polar regions. Returns: geopandas.GeoDataFrame: The data frame of collected ground track results. """ # first, compute the orbit track of the satellite gdf = collect_orbit_track(satellite, instrument, times, elevation, mask) if gdf.empty: return gdf # project points to specified elevation gdf.geometry = gdf.geometry.apply(lambda p: Point(p.x, p.y, elevation)) # at each point, draw a buffer equivalent to the swath radius if crs == "utm": # do the swath projection in the matching utm zone utm_crs = gdf.geometry.apply(_get_utm_epsg_code) for code in utm_crs.unique(): to_crs = pyproj.Transformer.from_crs( gdf.crs, pyproj.CRS(code), always_xy=True ).transform from_crs = pyproj.Transformer.from_crs( pyproj.CRS(code), gdf.crs, always_xy=True ).transform gdf.loc[utm_crs == code, "geometry"] = gdf[utm_crs == code].apply( lambda r: transform( from_crs, transform(to_crs, r.geometry).buffer(r.swath_width / 2), ), axis=1, ) else: # do the swath projection in the specified coordinate reference system to_crs = pyproj.Transformer.from_crs( gdf.crs, pyproj.CRS(crs), always_xy=True ).transform from_crs = pyproj.Transformer.from_crs( pyproj.CRS(crs), gdf.crs, always_xy=True ).transform gdf.geometry = gdf.apply( lambda r: transform( from_crs, transform(to_crs, r.geometry).buffer(r.swath_width / 2), ), axis=1, ) # add elevation to all polygon coordinates (otherwise lost during buffering) gdf.geometry = gdf.geometry.apply( lambda g: Polygon( [(p[0], p[1], elevation) for p in g.exterior.coords], [(p[0], p[1], elevation) for i in g.interiors for p in i.coords], ) ) # split polygons to wrap over the anti-meridian and poles gdf.geometry = gdf.apply(lambda r: split_polygon(r.geometry), axis=1) if mask is None: return gdf return gpd.clip(gdf, mask).reset_index(drop=True)
[docs]def compute_ground_track( satellite: Satellite, instrument: Instrument, times: List[datetime], elevation: float = 0, mask: Optional[Union[Polygon, MultiPolygon]] = None, crs: str = "EPSG:4087", method: str = "point", ) -> gpd.GeoDataFrame: """ Compute the aggregated ground track for a satellite of interest. Args: satellite (Satellite): The observing satellite. instrument (Instrument): The observing instrument. times (typing.List[datetime.datetime]): The list of datetimes to sample. elevation (float): The elevation (meters) above the datum in the WGS 84 coordinate system for which to calculate ground track. mask (Polygon or MultiPolygon): An optional mask to constrain results. crs (str): The coordinate reference system (CRS) in which to compute distance (default: World Equidistant Cylindrical `"EPSG:4087"`). Selecting `crs="utm"` uses Universal Transverse Mercator (UTM) zones for non-polar regions, and Universal Polar Stereographic (UPS) systems for polar regions. method (str): The method for computing ground track: `"point"` buffers individual points while `"line"` buffers a line of points. Returns: GeoDataFrame: The data frame of aggregated ground track results. """ if method == "point": track = collect_ground_track(satellite, instrument, times, elevation, mask, crs) # filter to valid observations and dissolve return track[track.valid_obs].dissolve() if method == "line": track = collect_orbit_track(satellite, instrument, times, elevation, mask) # filter to valid observations track = track[track.valid_obs].reset_index(drop=True) # project points to specified elevation points = track.geometry.apply(lambda p: Point(p.x, p.y, elevation)) # extract longitudes lons = track.geometry.apply(lambda p: p.x) if np.any(np.diff(np.sign(lons))): # split lines when crossing meridian or anti-meridian points_list = np.split( points, 1 + np.where(np.diff(np.sign(lons)))[0], ) segments = [ LineString(pts.tolist()) if len(pts.index) > 1 else pts[0] for pts in points_list ] else: segments = [LineString(points)] to_crs = pyproj.Transformer.from_crs( track.crs, pyproj.CRS(crs), always_xy=True ).transform from_crs = pyproj.Transformer.from_crs( pyproj.CRS(crs), track.crs, always_xy=True ).transform # apply the coordinate reference system transformation polygons = [ transform( from_crs, transform(to_crs, segment).buffer(track.swath_width.mean() / 2), ) for segment in segments ] # add elevation to all polygon coordinates (otherwise lost during buffering) polygons = [ Polygon( [(p[0], p[1], elevation) for p in g.exterior.coords], [(p[0], p[1], elevation) for i in g.interiors for p in i.coords], ) for g in polygons ] # split polygons if necessary polygons = list(map(split_polygon, polygons)) # dissolve the original track track = track.dissolve() # and replace the geometry with the union of computed polygons track.geometry = [unary_union(polygons)] return track raise ValueError("Invalid method: " + str(method))