Last Christmas I got my dad a Raspberry PI with a ADB-S reciever for collecting and sending data to https://www.flightradar24.com/ that allows you to track and view flights. When you set up a reciever you get access to their business account which was the actual present and allows my dad to work out which planes are flying overhead while hes sat out in the garden.
I was quite interested in the data and for the first time started to look at some 3D geospatial data one evening, in particular setting up the infrastructure for collecting that data.
Im sure there are better ways to do it but below is the way I put it together, it allows for a scalable batch process driven analysis which does not require a database (of sorts) and some simple serverless technology in AWS.
Feel free to explore the Notebook below of how I set up the system and how I analysed the data or jump straight to the PyDeck Visualisation example at https://blog.stuart-gordon.co.uk/notebooks/flight_tracks.html
System ArchietectureΒΆ
Below is an overview of the system architecture I will be using, it leverages serverless services to do most of the processing and means that no database needs to be maintained or managed, data is simply queries from the object store using AWS Athena. This wouldnt be ideal for a realtime data application, but for an adhoc batch data analysis process it ticks alot of boxes.
Data SourceΒΆ
The data for this analysis comes from a Flightradar24 receiver system. This system gathers flight information, including positions, altitudes, and timestamps, offering a comprehensive view of air traffic. Initially we shall outline the process of taking data from an aircraft.json
file, typically generated by a flight tracking setup like Flightradar24 on a Raspberry Pi, and uploading it to a RabbitMQ broker. The data can then be processed and analyzed using Amazon Athena.
Reading Data from aircraft.json
ΒΆ
The aircraft.json
file contains data about aircrafts detected by the ADS-B receiver. We start by reading this data into Python.
import json
# Path to the aircraft.json file
file_path = '/path/to/aircraft.json'
# Read data from aircraft.json
with open(file_path, 'r') as file:
aircraft_data = json.load(file)
Establishing Connection with RabbitMQ BrokerΒΆ
Next, we establish a connection with a RabbitMQ broker. This example assumes the use of a managed AmazonMQ broker using Amazon’s root certificates.
import pika
import ssl
# RabbitMQ server details
rabbitmq_server = 'your_rabbitmq_server'
exchange_name = 'your_exchange_name'
routing_key = 'your_routing_key'
username = 'your_username'
password = 'your_password'
# SSL context for AMQPS
context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
context.verify_mode = ssl.CERT_REQUIRED
# Connect to RabbitMQ
credentials = pika.PlainCredentials(username, password)
parameters = pika.ConnectionParameters(host=rabbitmq_server,
virtual_host='/',
credentials=credentials,
ssl_options=pika.SSLOptions(context),
port=5671) # Default port for AMQPS
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# Declare the exchange
channel.exchange_declare(exchange=exchange_name, exchange_type='topic', durable=True)
Publishing Data to RabbitMQΒΆ
Once the connection is established, we publish the JSON data to a specified topic on an exchange within RabbitMQ.
# Publish data to the exchange with the routing key
channel.basic_publish(exchange=exchange_name,
routing_key=routing_key,
body=json.dumps(aircraft_data),
properties=pika.BasicProperties(
delivery_mode=2, # persistent message
))
# Close the connection
connection.close()
This process can then be automated to run on the raspberry pi to run on a scheduled event, in my case every 15 seconds.
Processing Data with Amazon AthenaΒΆ
Finally, the data in RabbitMQ can be consumed by a lambda application that forwards it to Amazon S3, where it can be queried and analyzed using Amazon Athena.
import json
import base64
import boto3
import os
import uuid
import datetime
def is_json(myjson):
try:
json.loads(myjson)
except ValueError as e:
return False
return True
S3_BUCKET=os.getenv('s3Bucket')
S3_BUCKET_FOLDER=os.getenv('s3BucketFolder')
s3 = boto3.client('s3')
def put_s3_object(content, queues, message_format):
# print(content, queues, message_format)
date_time = datetime.datetime.now()
date_timestamp = date_time.strftime("%Y%m%d_%H%M%S_%f")
file_name = '/'.join([f"{queues}", f"date={date_time.year}{date_time.month:02d}{date_time.day:02d}", f"{str(uuid.uuid4())}"])
full_file_name = S3_BUCKET_FOLDER + "/" + file_name + '.' + message_format
multiline_string = '\r\n'.join(content)
s3.put_object(Body=multiline_string,Key=full_file_name,Bucket=S3_BUCKET)
def lambda_handler(event, context):
# print(event)
for queues in event['rmqMessagesByQueue']:
print (f"Queue: {queues}")
messages = []
for message in event['rmqMessagesByQueue'][queues]:
data = base64.b64decode(message['data']).decode('ascii')
if is_json(data):
messages.append(data)
print (f"Decoded messages in {queues} - {len(messages)}")
put_s3_object(messages, queues, "json")
Data ProcessingΒΆ
To ensure the quality and usability of the data, we’ve performed various data processing steps. This includes cleaning, formatting, and structuring the data, making it suitable for analysis and visualization. Firstly we used an AWS Glue Crawler to scan the inital files to build a schema for querying in Amazon Athena
CREATE EXTERNAL TABLE `glengyle__fr24`(
`now` double COMMENT 'from deserializer',
`messages` int COMMENT 'from deserializer',
`aircraft` array<struct<hex:string,squawk:string,altitude:int,mlat:array<string>,tisb:array<string>,messages:int,seen:double,rssi:double,flight:string,lat:double,lon:double,nucp:int,seen_pos:double>> COMMENT 'from deserializer')
PARTITIONED BY (
`date` string)
ROW FORMAT SERDE
'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'paths'='aircraft,messages,now')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
's3://open-data-archive/fr24/Glengyle::fr24/'
TBLPROPERTIES (
'CrawlerSchemaDeserializerVersion'='1.0',
'CrawlerSchemaSerializerVersion'='1.0',
'UPDATED_BY_CRAWLER'='fr24',
'averageRecordSize'='368',
'classification'='json',
'compressionType'='none',
'objectCount'='4',
'partition_filtering.enabled'='true',
'recordCount'='4',
'sizeKey'='1473',
'typeOfData'='file')
From here the data can be processed via batch processing queries – this works really well to generate csv files for exploratory data analysis, in this case a summary of all the different aircraft detected
Exploratory Data AnalysisΒΆ
First step was to get a view of the data in general on a map
import boto3
import time
import pydeck as pdk
import folium
import os
os.environ['USE_PYGEOS'] = '0'
import geopandas as gpd
import pandas as pd
from shapely.geometry import Point
from datetime import datetime
import h3
import matplotlib.pyplot as plt
import seaborn as sns
import branca.colormap as cm
import numpy as np
We will initially use an offline csv file downloaded from the AWS athena console.
# Load your data into a DataFrame
flight_data = pd.read_csv('fr24-z.csv')
A simple folium map was used to simply process the geospatial data and plot the tracks in folium
# Creating a base map centered around the specified base station location
base_location = (56.2, -4.6)
# Re-creating the base map with the base station location
m = folium.Map(location=base_location, zoom_start=8)
# Adding the base station location marker to the map
folium.Marker(
location=base_location,
popup="Base Station",
icon=folium.Icon(color='red', icon='info-sign')
).add_to(m)
<folium.map.Marker at 0x1ed75112530>
Due to the nature of how the geospatial track is formed in athena an almost reverse parser is required to extract the data in a useful way for processing
def custom_parse_geospatial_track(track_str):
# Splitting the string into individual track points
track_points = track_str.strip('[]').split('}, {')
# Parsing each track point
track = []
for point in track_points:
# Cleaning and splitting each point into lat, lon, and timestamp
point_clean = point.strip('{}').split(', ')
if len(point_clean) >= 2:
lat, lon = point_clean[0], point_clean[1]
try:
# Attempt to convert lat and lon to float and add to track
track.append((float(lat), float(lon)))
except ValueError:
# If conversion fails, skip this point
continue
return track
# Adding flight tracks to the map with the custom parsing function
for _, row in flight_data.iterrows():
track = custom_parse_geospatial_track(row['geospatial_track'])
if track:
folium.PolyLine(track, color="blue", weight=2.5, opacity=1).add_to(m)
# Display the map with flight tracks
m
Some interesting observations:
- Most of the detected planes are to the South of the reciever location
- You can start to see the main flight corridors
- This data was from 2hrs of data capture detecting over 100 planes in that short period of time (for a Friday evening (23rd Dec 2023) – granted people flying home for Xmas)
Automating the Data PipelineΒΆ
To make importing the data easeier an automated pipeline would make things easier to retrive the csv from the athena query directly from the notebook.
# Function to check if the file exists in S3
def check_file_exists(bucket_name, s3_file_key):
s3 = boto3.client('s3')
try:
s3.head_object(Bucket=bucket_name, Key=s3_file_key)
return True
except s3.exceptions.ClientError:
return False
# Function to download file from S3
def download_file_from_s3(bucket_name, s3_file_key, local_file_path):
s3 = boto3.client('s3')
s3.download_file(bucket_name, s3_file_key, local_file_path)
def query_athena_and_download(query, database, s3_output, bucket):
athena = boto3.client('athena')
# Start the query execution
response = athena.start_query_execution(
QueryString=query,
QueryExecutionContext={'Database': database},
ResultConfiguration={'OutputLocation': s3_output}
)
query_execution_id = response['QueryExecutionId']
# Wait for the query to complete
while True:
response = athena.get_query_execution(QueryExecutionId=query_execution_id)
state = response['QueryExecution']['Status']['State']
if state in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
break
time.sleep(1) # Wait before polling again
if state == 'SUCCEEDED':
# Correcting the result file key
# Assuming s3_output is something like 's3://bucket_name/path/to/output/'
result_file_key = s3_output.replace('s3://{}/'.format(bucket), '') + query_execution_id + '.csv'
# Check if the file exists in S3 and wait if it doesn't
while not check_file_exists(bucket, result_file_key):
time.sleep(1) # Wait and check again
local_file_path = 'result.csv' # Local file path for downloaded result
# Download the result file from S3
download_file_from_s3(bucket, result_file_key, local_file_path)
# Load the result into a Pandas DataFrame
df = pd.read_csv(local_file_path)
return df
else:
raise Exception(f"Query failed: {state}")
# Example usage
athena_query = """
SELECT * FROM "fr24"."aircraft_smmary"
"""
athena_database = "fr24"
s3_output_path = "s3://open-data-archive/applications/athena/"
# Extract bucket name and file key from s3_output_path
bucket_name = 'open-data-archive'
df = query_athena_and_download(athena_query, athena_database, s3_output_path, bucket_name)
Decided to explore this a little further, using GeoPandas to make processing of the data in a spatial dataframe allows for some easier spatial processing
def custom_parse_geospatial_track(track_str):
track_points = track_str.strip('[]').split('}, {')
track = []
for point in track_points:
point_clean = point.strip('{}').split(', ')
if len(point_clean) >= 4:
lat, lon, alt, timestamp = point_clean[0], point_clean[1], point_clean[2], point_clean[3]
try:
# Parse the timestamp
timestamp = datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S.%f')
# Create a tuple of Point and timestamp
track.append((Point(float(lat), float(lon), float(alt)), timestamp))
except ValueError:
continue
return track
track_data = []
for _, row in df.iterrows():
track_points = custom_parse_geospatial_track(row['geospatial_track'])
for point, timestamp in track_points:
track_point_data = row.to_dict()
track_point_data['geometry'] = point
track_point_data['timestamp'] = timestamp
track_point_data['id'] = str(track_point_data['flight']) + "_" + str(track_point_data['hex'])
track_data.append(track_point_data)
track_df = pd.DataFrame(track_data)
geo_track_df = gpd.GeoDataFrame(track_df, geometry='geometry')
# Extracting longitude, latitude, and altitude from the geometry column
geo_track_df['longitude'] = geo_track_df['geometry'].x
geo_track_df['latitude'] = geo_track_df['geometry'].y
geo_track_df['altitude'] = geo_track_df['geometry'].z * 0.3048
geo_track_df.head()
hex | detected_start_time | detected_end_time | min_altitude | max_altitude | flight | geospatial_track | geometry | timestamp | id | longitude | latitude | altitude | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 407a06 | 2023-12-23 09:03:09.900 | 2023-12-23 09:16:08.200 | 10350 | 32050 | EZY153D | [{54.654327, -2.871713, 32000, 2023-12-23 09:0… | POINT Z (54.65433 -2.87171 32000.00000) | 2023-12-23 09:03:09.900 | EZY153D _407a06 | 54.654327 | -2.871713 | 9753.60 |
1 | 407a06 | 2023-12-23 09:03:09.900 | 2023-12-23 09:16:08.200 | 10350 | 32050 | EZY153D | [{54.654327, -2.871713, 32000, 2023-12-23 09:0… | POINT Z (54.69795 -2.91987 32025.00000) | 2023-12-23 09:03:41.100 | EZY153D _407a06 | 54.697948 | -2.919866 | 9761.22 |
2 | 407a06 | 2023-12-23 09:03:09.900 | 2023-12-23 09:16:08.200 | 10350 | 32050 | EZY153D | [{54.654327, -2.871713, 32000, 2023-12-23 09:0… | POINT Z (54.74399 -2.97080 32050.00000) | 2023-12-23 09:04:11.200 | EZY153D _407a06 | 54.743989 | -2.970803 | 9768.84 |
3 | 407a06 | 2023-12-23 09:03:09.900 | 2023-12-23 09:16:08.200 | 10350 | 32050 | EZY153D | [{54.654327, -2.871713, 32000, 2023-12-23 09:0… | POINT Z (54.79189 -3.02382 31650.00000) | 2023-12-23 09:04:43.300 | EZY153D _407a06 | 54.791885 | -3.023825 | 9646.92 |
4 | 407a06 | 2023-12-23 09:03:09.900 | 2023-12-23 09:16:08.200 | 10350 | 32050 | EZY153D | [{54.654327, -2.871713, 32000, 2023-12-23 09:0… | POINT Z (54.83761 -3.07318 30975.00000) | 2023-12-23 09:05:14.500 | EZY153D _407a06 | 54.837605 | -3.073176 | 9441.18 |
Time Series AnalysisΒΆ
With the data coming in for a longer period of time in comparison to the initial csv, we can do some more time series analysis to see how many flights are being detected over time. The number of unique flights being detected for a quite remote area is surprising.
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import seaborn as sns
# Apply the seaborn style
sns.set(style="darkgrid")
# Convert the time columns to datetime objects
geo_track_df['detected_start_time'] = pd.to_datetime(geo_track_df['detected_start_time'])
geo_track_df['detected_end_time'] = pd.to_datetime(geo_track_df['detected_end_time'])
# Create an empty DataFrame for the time intervals
time_intervals = pd.date_range(start=geo_track_df['detected_start_time'].min(),
end=geo_track_df['detected_end_time'].max(),
freq='T') # 'T' for minute intervals
# Initialize a Series to store the count of unique flights
unique_flight_counts = pd.Series(0, index=time_intervals)
# Count the number of unique flights in each interval
for interval in time_intervals:
active_flights = geo_track_df[(geo_track_df['detected_start_time'] <= interval) &
(geo_track_df['detected_end_time'] >= interval)]['id']
unique_flight_counts[interval] = active_flights.nunique()
# Plotting
plt.figure(figsize=(12, 6))
plt.plot(unique_flight_counts.index, unique_flight_counts.values)
plt.xlabel('Time (hourly intervals)')
plt.ylabel('Number of Unique Flights Detected')
plt.title('Unique Flights Detected Over Time (Hourly Intervals)')
# Format x-axis to show labels for each hour
plt.gca().xaxis.set_major_locator(mdates.HourLocator(interval=1)) # Show every hour
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d %H:%M')) # Format datetime
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
Curiously it would be interesting to see if there is a specific group of flights running in particular duing the 10pm to 5am as I was under the assumption most UK airports stop filghts during night time hours to reduce noise polution. In which case what flights are going on during this time? Are they long haul flights to the US for example?
# Define altitude bands
altitude_bands = {'Very Low': (0, 10000), 'Low': (10001, 20000), 'Medium': (20001, 30000), 'High': (30001, 35000), 'Very High': (35001, 40000), 'Very Very High': (40001, 50000)}
# Initialize DataFrame for unique flight counts
unique_flight_counts = pd.DataFrame(0, index=time_intervals, columns=altitude_bands.keys())
# Count unique flights
for interval in time_intervals:
active_flights = geo_track_df[(geo_track_df['detected_start_time'] <= interval) & (geo_track_df['detected_end_time'] >= interval)]
for band, (min_alt, max_alt) in altitude_bands.items():
flights_in_band = active_flights[(active_flights['min_altitude'] >= min_alt) & (active_flights['max_altitude'] <= max_alt)]['id']
unique_flight_counts.loc[interval, band] = flights_in_band.nunique()
# Plotting
plt.figure(figsize=(12, 6))
# Create stacked area plot
plt.stackplot(unique_flight_counts.index, unique_flight_counts.T, labels=altitude_bands.keys())
plt.xlabel('Time (hourly intervals)')
plt.ylabel('Cumulative Number of Unique Flights Detected')
plt.title('Unique Flights Detected Over Time by Altitude Band (Hourly Intervals, Stacked Area Plot)')
# Format x-axis
plt.gca().xaxis.set_major_locator(mdates.HourLocator(interval=1))
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d %H:%M'))
plt.legend(loc='upper left')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
3D Visualisation & KMeans ClusteringΒΆ
This implies quite an intersting combination of different flights at different altitudes, and actually there are a number of low flights still taking place. This might look better visualised in a 3D scatter plot. Adding in a simple KMeans cluster to group some of this data.
import geopandas as gpd
import plotly.express as px
from sklearn.cluster import KMeans
fig = px.scatter_3d(geo_track_df, x='longitude', y='latitude', z='altitude',
color_discrete_sequence=px.colors.qualitative.Plotly)
fig.update_layout(margin=dict(l=0, r=0, b=0, t=0))
fig.show()
# Perform K-means clustering
k = 8 # Adjust this based on your requirement
kmeans = KMeans(n_clusters=k)
geo_track_df['cluster'] = kmeans.fit_predict(geo_track_df[['longitude', 'latitude', 'altitude']])
# Interactive scatter plot with clusters
fig = px.scatter_3d(geo_track_df, x='longitude', y='latitude', z='altitude',
color='cluster', color_continuous_scale=px.colors.sequential.Viridis)
fig.update_layout(margin=dict(l=0, r=0, b=0, t=0))
fig.show()
c:\Users\stuar\AppData\Local\Programs\Python\Python310\lib\site-packages\sklearn\cluster\_kmeans.py:870: FutureWarning: The default value of `n_init` will change from 10 to 'auto' in 1.4. Set the value of `n_init` explicitly to suppress the warning
One of the interesting parts here is the shape of the detection area. This seems to be a driver of the skyshed around where the reciever is located. I would love to look at this in a bit more detail however, the level of accurate LiDAR data (1m) quired is only availible in England (100% converage) and some areas in Scotland…which as we live in quite a sparsely populated area doesnt seem to be of the highest concern and not part of the Scottish LiDAR areas. It would be great to do a 3D skyshed analysis using the DTM/DSM to see if there is a correlation and/or could we optimise the position?
This shows some quite clear bands but clearly the altitude seems to have been the big driver in the clustering. Further feature analysis and analysis could be done to support the multivariate clustering with features such as speed or direction that could be calculated. To quickly see this a haversine distance was calculated to determine the distance between the plane and the reciever.
If we were to add this into the clustering we would need to normalise this data to allow the kmeans to work properly across the variables.
def haversine(lat1, lon1, lat2, lon2):
# Radius of the Earth in km
R = 6371.0
# Convert coordinates from degrees to radians
lat1_rad = np.radians(lat1)
lon1_rad = np.radians(lon1)
lat2_rad = np.radians(lat2)
lon2_rad = np.radians(lon2)
# Difference in coordinates
dlat = lat2_rad - lat1_rad
dlon = lon2_rad - lon1_rad
# Haversine formula
a = np.sin(dlat / 2)**2 + np.cos(lat1_rad) * np.cos(lat2_rad) * np.sin(dlon / 2)**2
c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a))
# Distance in kilometers
distance = R * c
return distance
def calculate_bearing(lat1, lon1, lat2, lon2):
# Convert coordinates from degrees to radians
lat1_rad = np.radians(lat1)
lon1_rad = np.radians(lon1)
lat2_rad = np.radians(lat2)
lon2_rad = np.radians(lon2)
# Calculate the bearing
dlon = lon2_rad - lon1_rad
x = np.sin(dlon) * np.cos(lat2_rad)
y = np.cos(lat1_rad) * np.sin(lat2_rad) - np.sin(lat1_rad) * np.cos(lat2_rad) * np.cos(dlon)
initial_bearing = np.arctan2(x, y)
# Convert bearing from radians to degrees
initial_bearing = np.degrees(initial_bearing)
compass_bearing = (initial_bearing + 360) % 360
return compass_bearing
def total_distance_and_bearing(row, target_lat, target_lon, target_alt):
# Calculate geographic distance
geo_dist = haversine(row['longitude'], row['latitude'], target_lat, target_lon)
# Calculate bearing
bearing = calculate_bearing(target_lat, target_lon, row['longitude'], row['latitude'])
# Convert altitude from feet to meters, then to kilometers
row_altitude_km = row['altitude'] / 1000
target_alt_km = target_alt / 1000
# Calculate altitude difference in kilometers
alt_diff = abs(row_altitude_km - target_alt_km)
# Total distance considering both geographic and altitude differences
total_dist = np.sqrt(geo_dist**2 + alt_diff**2)
return total_dist, bearing
# Target location coordinates and altitude in meters
target_lat, target_lon, target_alt = 56.2, -4.6, 10
# Apply the distance and bearing calculation for each row in your DataFrame
geo_track_df[['distance_to_target', 'bearing_to_target']] = geo_track_df.apply(
lambda row: total_distance_and_bearing(row, target_lat, target_lon, target_alt),
axis=1, result_type='expand')
Polar PlotsΒΆ
However the more interesting way to draw this information is through a polar plot.
# Set Seaborn style
sns.set(style="whitegrid")
# Convert bearings to radians for plotting
bearings_rad = np.radians(geo_track_df['bearing_to_target'])
# Define the feature for color coding
color_feature = 'altitude' # Replace with the name of your feature column
# Create polar plot using Matplotlib
fig = plt.figure(figsize=(8, 8))
ax = fig.add_subplot(111, polar=True)
# Plot data with color based on the feature
sc = ax.scatter(bearings_rad, geo_track_df['distance_to_target'],
c=geo_track_df[color_feature], cmap='viridis', alpha=0.7)
# Adding a color bar
cbar = plt.colorbar(sc, ax=ax)
cbar.set_label(color_feature, rotation=270, labelpad=15)
ax.set_theta_zero_location('N') # Set the direction of 0 degrees to the top (North)
ax.set_theta_direction(-1) # Set the direction of degrees to be clockwise
ax.set_title("Polar Plot of Distances (Km) and Directions from Reciever Point", va='bottom')
plt.show()
AggregationΒΆ
For each individual point this works very well however the points are overlaid. In comparision then we could aggregrate the data by H3 grids. Though using a dark theme just makes it more visually pleasing more than anything.
# Assuming geo_track_df is your GeoDataFrame
# Add a column for the H3 index at a specified resolution (e.g., resolution 7)
resolution = 6
geo_track_df['h3_index'] = geo_track_df.apply(lambda row: h3.geo_to_h3(row['geometry'].y, row['geometry'].x, resolution), axis=1)
# Aggregate data by H3 index
# You can adjust the aggregation functions as per your requirements
agg_data = geo_track_df.groupby('h3_index').agg({
'timestamp': ['count', 'min', 'max'],
'min_altitude': ['mean', 'min', 'max'],
'max_altitude': ['mean', 'min', 'max'],
# Add other fields as necessary
}).reset_index()
# Renaming columns for clarity
agg_data.columns = ['_'.join(col).strip() for col in agg_data.columns.values]
agg_data.rename(columns={'h3_index_': 'h3_index'}, inplace=True)
# Initialize a Folium map with a dark base map
m = folium.Map(location=[55.5, -4], zoom_start=8, tiles='CartoDB dark_matter')
# Create a color map
max_count = agg_data['timestamp_count'].max()
color_map = cm.linear.YlOrRd_09.scale(0, max_count)
# Function to determine color based on count
def get_color(count):
return color_map(count)
# Add a choropleth layer
for _, row in agg_data.iterrows():
boundary = h3.h3_to_geo_boundary(row['h3_index'], geo_json=True)
tooltip = f"H3 Index: {row['h3_index']}\nPlane Count: {row['timestamp_count']}"
folium.Polygon(
locations=boundary,
fill=True,
color='black', # outline color
weight=0,
fill_color=get_color(row['timestamp_count']),
fill_opacity=0.9,
tooltip=tooltip
).add_to(m)
# Add the color map to the map
color_map.add_to(m)
# Show the map
m
This givew quite an interesting visual and starts to give a heatmap of the most popular areas, but planes exist and move in a 3D space so worth looking in that Z component. This also needs a new visualisation package as Folium is limited to 2D mainly, DashDeck brings in the ability to handle large 3D datasets.
Processing the data for pydeck requires some custom data processing
def custom_parse_geospatial_track_for_pydeck(track_str, min_altitude, max_altitude):
track_points = track_str.strip('[]').split('}, {')
track = []
for point in track_points:
point_clean = point.strip('{}').split(', ')
if len(point_clean) >= 2:
lat, lon = point_clean[0], point_clean[1]
# Using the average of min and max altitude as a placeholder for altitude
altitude = (min_altitude + max_altitude) / 2
try:
# Swapping lat and lon to correct the order
track.append({'position': [float(lon), float(lat), float(altitude)]})
except ValueError:
continue
return track
# Function to map altitude to a color on a red-to-green scale
def altitude_to_color(altitude, min_altitude, max_altitude):
# Normalize altitude in the range 0 to 1
normalized_altitude = (altitude - min_altitude) / (max_altitude - min_altitude)
# Red to green color scale
red = int((1 - normalized_altitude) * 255)
green = int(normalized_altitude * 255)
blue = 0 # No blue component
return [red, green, blue, 200] # RGBA color
# Finding the minimum and maximum altitudes in the dataset
min_altitude = df['min_altitude'].min()
max_altitude = df['max_altitude'].max()
# Preparing the data for pydeck visualization with altitude-based color
data_for_pydeck_with_color = []
for _, row in df.iterrows():
track = custom_parse_geospatial_track_for_pydeck(row['geospatial_track'], row['min_altitude'], row['max_altitude'])
if track:
# Flatten the track into individual segments for pydeck LineLayer
for i in range(len(track) - 1):
# Determine the color based on the altitude
color = altitude_to_color((track[i]['position'][2] + track[i + 1]['position'][2]) / 2, min_altitude, max_altitude)
segment = {
'source_position': track[i]['position'],
'target_position': track[i + 1]['position'],
'color': color,
'flight': row['flight'],
'hex': row['hex'],
}
data_for_pydeck_with_color.append(segment)
# Prepare tooltip information
tooltip = {
"html": "<b>Flight:</b> {flight} <br><b>Hex:</b> {hex}",
"style": {
"backgroundColor": "steelblue",
"color": "white"
}
}
# Create the pydeck layer
layer = pdk.Layer(
"LineLayer",
data_for_pydeck_with_color[0:int(len(data_for_pydeck_with_color)/1)],
get_source_position="source_position",
get_target_position="target_position",
get_width=3,
get_color="color",
pickable=True
)
## Set the view with a 5-degree pitch for a more horizontal perspective
view_state = pdk.ViewState(latitude=56.2, longitude=-4.6, zoom=6, pitch=85)
# Render the map
r = pdk.Deck(layers=[layer], initial_view_state=view_state, tooltip=tooltip)
r.to_html('flight_tracks.html')
This PyDeck output is just stunning more than anything. Now the data pipeline is set up I might just leave this for a few weeks/months to get a bit of a larger data set to do some more analysis (and a holiday to get some time to look at it). One of the benefits of setting up the data collection and initial cleaning and processing allows me to look at data in the future very quickly and get a suitable dataset in a very short space of time – or a live dashboard that continually updated.
ConclusionΒΆ
This exploratory data analysis of flight data has provided some intriguing insights into the flightradar and ADB-S data but also the stages involved to create an automated data collection, processing and analysis pipeline.