Source code for tatc.analysis.latency

# -*- coding: utf-8 -*-
"""
Methods to perform latency analysis.

@author: Isaac Feldman, Paul T. Grogan <pgrogan@stevens.edu>
"""

from typing import List, Union
from datetime import datetime, timedelta

import numpy as np
import pandas as pd
import geopandas as gpd
from shapely import geometry as geo
from skyfield.api import wgs84, EarthSatellite

from ..schemas.point import GroundStation
from ..schemas.satellite import Satellite

from .coverage import _get_visible_interval_series


def _get_empty_downlinks_frame() -> gpd.GeoDataFrame:
    """
    Gets an empty data frame for downlink results.

    Returns:
        geopandas.GeoDataFrame: Empty data frame.
    """
    columns = {
        "station": pd.Series([], dtype="str"),
        "geometry": pd.Series([], dtype="object"),
        "satellite": pd.Series([], dtype="str"),
        "start": pd.Series([], dtype="datetime64[ns, utc]"),
        "epoch": pd.Series([], dtype="datetime64[ns, utc]"),
        "end": pd.Series([], dtype="datetime64[ns, utc]"),
    }
    return gpd.GeoDataFrame(columns, crs="EPSG:4326")





def _get_empty_latency_frame() -> gpd.GeoDataFrame:
    """
    Gets an empty data frame for downlink results.

    Returns:
        geopandas.GeoDataFrame: Empty data frame.
    """
    columns = {
        "point_id": pd.Series([], dtype="int"),
        "geometry": pd.Series([], dtype="object"),
        "satellite": pd.Series([], dtype="str"),
        "instrument": pd.Series([], dtype="str"),
        "observed": pd.Series([], dtype="datetime64[ns, utc]"),
        "station": pd.Series([], dtype="str"),
        "downlinked": pd.Series([], dtype="datetime64[ns, utc]"),
        "latency": pd.Series([], dtype="timedelta64[ns]"),
    }
    return gpd.GeoDataFrame(columns, crs="EPSG:4326")


[docs]def compute_latencies( observations: gpd.GeoDataFrame, downlinks: gpd.GeoDataFrame ) -> gpd.GeoDataFrame: """ Collect latencies between an observation and the first downlink opportunity. Args: observations (geopandas.GeoDataFrame): The data frame of observations to downlink. downlinks (geopandas.GeoDataFrame): The data frame of downlink opportunities. Returns: geopandas.GeoDataFrame: The data frame of collected latency results. """ if observations.empty or downlinks.empty: return _get_empty_latency_frame() def _align_downlinks(row): # filter downlinks after observation occurs dls = downlinks[ np.logical_and( row.satellite == downlinks.satellite, row.end < downlinks.start ) ] # append latency-specific columns if dls.empty: row["station"] = None row["downlinked"] = None row["latency"] = None else: row["station"] = dls.iloc[0].station row["downlinked"] = dls.iloc[0].epoch row["latency"] = dls.iloc[0].epoch - row.epoch return row # append the latency-specific columns observations = observations.apply(_align_downlinks, axis=1) # add observed column observations["observed"] = observations["epoch"] # drop start, epoch, and end columns observations = observations.drop(["start", "epoch", "end"], axis=1) return observations
def _get_empty_reduce_frame() -> gpd.GeoDataFrame: """ Gets an empty data frame for reduced latency results. Returns: geopandas.GeoDataFrame: Empty data frame. """ columns = { "point_id": pd.Series([], dtype="int"), "geometry": pd.Series([], dtype="object"), "latency": pd.Series([], dtype="timedelta64[ns]"), "samples": pd.Series([], dtype="int"), } return gpd.GeoDataFrame(columns, crs="EPSG:4326")
[docs]def reduce_latencies(latency_observations: gpd.GeoDataFrame) -> gpd.GeoDataFrame: """ Reduce observation latencies. Computes descriptive statistics for each pair of observation and first downlink opportunities. Args: latency_observations (geopandas.GeoDataFrame): The latency observations. Returns: geopandas.GeoDataFrame: The data frame with reduced latencies. """ if latency_observations.notna().empty: return _get_empty_reduce_frame() # operate on a copy of the dataframe gdf = latency_observations.copy() # convert latency to a numeric value before aggregation gdf["latency"] = gdf["latency"] / timedelta(seconds=1) # assign each record to one observation gdf["samples"] = 1 # perform the aggregation operation gdf = gdf.dissolve( "point_id", aggfunc={ "latency": "mean", "samples": "sum", }, ) # convert latency from numeric values after aggregation gdf["latency"] = gdf["latency"].apply( lambda t: pd.NaT if pd.isna(t) else timedelta(seconds=t) ) return gdf
[docs]def grid_latencies( reduced_latencies: gpd.GeoDataFrame, cells: gpd.GeoDataFrame ) -> gpd.GeoDataFrame: """ Grid reduced latencies to cells. Args: reduced_latencies (geopandas.GeoDataFrame): The reduced latencies. cells (geopandas.GeoDataFrame): The cell specification. Returns: geopandas.GeoDataFrame: The data frame with gridded latencies. """ if reduced_latencies.empty: gdf = cells.copy() gdf["samples"] = 0 gdf["latency"] = None return gdf # operate on a copy of the data frame gdf = reduced_latencies.copy() # convert latency to numeric values before aggregation gdf["latency"] = gdf["latency"] / timedelta(seconds=1) gdf = ( cells.sjoin(gdf, how="inner", predicate="contains") .dissolve( by="cell_id", aggfunc={ "samples": "sum", "latency": lambda r: np.average(r, weights=gdf.loc[r.index, "samples"]), }, ) .reset_index() ) # convert latency from numeric values after aggregation gdf["latency"] = gdf["latency"].apply( lambda t: pd.NaT if pd.isna(t) else timedelta(seconds=t) ) return gdf