Skip to content

Project - Analyzing Transit Feeds

Posted on:August 2, 2025

Objective

To analyze public transit information with batch and streaming analytics via Kafka Connect, Flink SQL.

Tech Stack

Data Sources

Texas and Austin Open Data Portals (I like working with geospatial data and have relied upon this portal before as well)

LinkNoteUpdate Frequency
https://data.texas.gov/d/cuc7-ywmdVehicle Positions (json) - track location, speed, which trip its on, etc.15 seconds
https://data.texas.gov/d/mqtr-wwpyTrip Updates (json) - where it is now, which stops its heading to next, etc.15 seconds
https://data.austintexas.gov/d/dx9v-zd7xTraffic Incidents (json) - road accidents across cars, bikes, etc.5 minutes
https://data.texas.gov/d/r4v4-vz24Schedule data (csv) - stops, stop timings, fare structure, etc.Static/lookup data

The transit data follow the GTFS specification (like OpenAPI specification for transit data, created by Google).

I have ingested the JSON data, but they also provide it in Protobuf (schema here).

Sample Data:

Ingesting data into Kafka

For ingesting data, I have used Kafka Connect’s connectors

Topic(s)ConnectorIngestion Frequency
vehiclePositions , tripUpdatesHTTP Source1 minute
trafficIncidentsHTTP Source5 minutes
stops , fare_attributes , ,transfers, routes , stop_timesS3 Source-

After ingesting the data, I used Gemini to create the data contracts (JSON Schema), and then applied it to the topics (except schedule data).

partition=1 to save $$

partition=1 to save $$

For stream transformations and analytics, I have used Flink SQL:

Transformations:

The GTFS-Realtime data has multiple nested levels (of type ROW in Flink) so to make our life easier and store only relevant factual information (without metadata), we flatten (unnest) the data and retrieve only specific columns.

Generating fact tables from vehiclePositions and tripUpdates

CREATE TABLE fact_trip_updates (WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND) DISTRIBUTED BY (id) INTO 3 BUCKETS
 WITH ('changelog.mode' = 'append', 'kafka.cleanup-policy' = 'delete', 'kafka.retention.time' = '1 day', 'scan.startup.mode'='latest-offset', 
  'key.format' = 'json-registry', 'value.format' = 'json-registry'
) AS
SELECT
    t.id,
    -- Unpack fields from the top-level 'tripUpdate' ROW
    t.tripUpdate.trip.tripId,
    t.tripUpdate.trip.routeId,
    t.tripUpdate.vehicle.id AS vehicleId,
    -- Unpack fields from the unnested 'stop_update' ROW
    stop_update.stopSequence,
    stop_update.stopId,
    stop_update.arrival.`time` AS arrival_time,
    stop_update.departure.`time` AS departure_time,
    -- Carry over the Flink event time for each new row
    t.`$rowtime` AS event_time
FROM
    tripUpdates AS t,
    -- Use UNNEST to expand the stopTimeUpdate array into individual rows
    UNNEST(t.tripUpdate.stopTimeUpdate) AS stop_update;
    
  CREATE TABLE fact_vehicle_positions (WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND) DISTRIBUTED BY (id) INTO 1 BUCKETS
 WITH ('changelog.mode' = 'append', 'kafka.cleanup-policy' = 'delete', 'kafka.retention.time' = '1 day', 'scan.startup.mode'='latest-offset', 
  'key.format' = 'json-registry', 'value.format' = 'json-registry'
) AS
SELECT
    id,
    -- Unpack fields from the nested 'vehicle' ROW
    vehicle.trip.tripId,
    vehicle.trip.routeId,
    `vehicle`.`position`.`latitude`,
    `vehicle`.`position`.`longitude`,
    `vehicle`.`position`.`speed`,
    vehicle.currentStatus,
    vehicle.stopId,
    `vehicle`.`timestamp` AS vehicle_timestamp,
  $rowtime AS event_time
FROM
    vehiclePositions;

Analytics:

  1. Average speed and number of Vehicles per Route (Last 10 Minutes)

    Average speed and number of Vehicles per Route (Last 10 Minutes)
  2. Current location of each vehicle

    Current location of each vehicle
  3. Forgot to take more screenshots (and my $25 Confluent credits got over ☠️ for me to retry). But did some more analysis on average delays in past hour, routes with most delays, ETA for each stop with updated times, checking stops with delays have traffic accidents nearby (using geospatial UDF with haversine distance).

Loading data into S3

For loading raw data (real-time & derived data) into S3, I used the S3 Sink connector.

Loading raw data into S3 Loading derived data into S3

Then I used Glue Data Catalog to store table metadata.

Batch Analytics with Athena

With all data stored in S3, we can use Athena (or Redshift Serverless) to run some batch/ad-hoc analytics. Due to some limitations on free account plan, I can’t do this.

I have already exhausted my QuickSights free 30 days, so I am not able to use that too. I was planning to use it for time-series and map visualizations.

Takeaways


Extending this system

Readings

Related Posts