# Agent-based Transportaion Model

Let's create a simple agent-based transportation model. First, start with a transportation network. Our transportation network can be created as a Networkx Graph or a (Geo)Pandas (Geo)DataFrame. There are some sample transportation networks available in dpd.mapping.samples. Real-life networks can be imported from OpenStreetMap via pyrosm.

In [None]:
from networkx import draw

from dpd.mapping.samples import cross_box

graph = cross_box

pos = {
 node: (graph.nodes()[node]["geometry"].x, graph.nodes()[node]["geometry"].y)
 for node in graph.nodes
}

node_color = [
 (
 "red"
 if graph.nodes()[node].get("type") == "stop_sign"
 else (
 "yellow"
 if graph.nodes()[node].get("type") == "yield_sign"
 else (
 "blue"
 if graph.nodes()[node].get("type") == "stop_light"
 else "orange" if graph.nodes()[node].get("type") == "stop" else "green"
 )
 )
 )
 for node in graph.nodes()
]

draw(graph, pos=pos, node_color=node_color, with_labels=True)

In [None]:
from geopandas import GeoDataFrame
from matplotlib import pyplot as plt

edges_df = GeoDataFrame(
 [graph.edges[edge] for edge in graph.edges], index=list(graph.edges)
)
nodes_df = GeoDataFrame([graph.nodes[node] for node in graph.nodes])

fig = plt.figure(figsize=(9, 8))
ax = fig.add_subplot(111)

nodes_df["geometry"].plot(ax=ax, color=node_color, markersize=1000)
nodes_df.apply(
 lambda x: ax.annotate(text=x.name, xy=x.geometry.coords[0], ha="center", size=20),
 axis=1,
)
edges_df["geometry"].plot(ax=ax)

networkx provides the ability to compute a path from any node to another node. When using OpenStreetMap, the same can be accomplished via the Open Source Routing Machine.

In [None]:
from networkx import shortest_path

node_ids = shortest_path(graph, 0, 1)
print("Node IDs:", node_ids)

To create a couple agents, we can create some transportation zones. For simplicity, we will create one zone per node with three Production and three Attraction so each zone has one person that goes to each other zone. Below are some other DataFrames we can generate from the Zones DataFrame. Also, we will create a path for each person using networkx.

In [None]:
from dpd.modeling import Zones

zones = Zones(
 data=[
 {"Name": "Zone 0", "Production": 3, "Attraction": 3},
 {"Name": "Zone 1", "Production": 3, "Attraction": 3},
 {"Name": "Zone 2", "Production": 3, "Attraction": 3},
 {"Name": "Zone 3", "Production": 3, "Attraction": 3},
 ],
 index=nodes_df.index,
)
zones["geometry"] = nodes_df["geometry"]
zones

In [None]:
distance_dataframe = zones.calculate_distance_dataframe()
distance_dataframe

In [None]:
import numpy

from dpd.modeling import TripDataFrame

trip_dataframe = TripDataFrame(
 data=numpy.ones([4, 4]), index=zones.index, columns=zones.index
).map(int)

trip_dataframe

In [None]:
from dpd.modeling import Population

population = Population.from_trip_dataframe(trip_dataframe)
population = population[
 population.origin != population.destination
] # remove rows where the origin and destination zone are equal
population

In [None]:
population["node_ids"] = population.apply(
 lambda x: shortest_path(graph, x["origin"], x["destination"]), axis=1
)
population

Next, we setup and run our agent-based model. We need to tranform our transportation network in to Python objects for Edges and Nodes. Again, this can be done with either a Graph or a (Geo)DataFrame.

In [None]:
from uuid import uuid4

from dpd.driving import EdgesLanesNodesDriver
from dpd.mapping import add_object_to_edges_and_nodes
from dpd.mapping.nodes import NodeModel
from dpd.mechanics import KinematicBodyWithAcceleration
from dpd.mechanics.datacollection import BODY_AGENT_REPORTERS
from dpd.modeling import TransportationModel

body_model = TransportationModel(
 agent_reporters=BODY_AGENT_REPORTERS | {"geometry": "geometry"}
)

node_model = NodeModel()

graph = add_object_to_edges_and_nodes(graph, node_model)

for index, row in population.iterrows():
 kbwas = KinematicBodyWithAcceleration(
 initial_acceleration=0.1,
 initial_velocity=0.1,
 initial_position=0,
 max_deceleration=0.1,
 min_velocity=0,
 unique_id=uuid4(),
 model=body_model,
 )
 elnd = EdgesLanesNodesDriver.from_node_ids(
 nodes_dict=graph.nodes,
 edges_dict=graph.edges,
 node_ids=row["node_ids"],
 body=kbwas,
 driver_final_velocity=0,
 unique_id=uuid4(),
 model=body_model,
 )
 body_model.schedule.add(elnd)

while body_model.running:
 body_model.step()
 node_model.step()

df = GeoDataFrame(body_model.get_dataframe())
df

In [None]:
from datetime import datetime

from geopandas import GeoDataFrame
from movingpandas import TrajectoryCollection
from pandas import to_timedelta

START_TIME = datetime(1970, 1, 1, 0, 0, 0)
timedelta = to_timedelta(df.index.levels[0], unit="s")
index = START_TIME + timedelta
gdf = df
gdf.index = gdf.index.set_levels(index, level=0)
gdf.reset_index(level="AgentID", inplace=True)
tc = TrajectoryCollection(gdf, "AgentID")
tc.add_speed()
tc.hvplot(line_width=10)

In [None]:
import random


def random_color():
 return (
 "#"
 + hex(random.randint(0, 0xFF))[2:]
 + hex(random.randint(0, 0xFF))[2:]
 + hex(random.randint(0, 0xFF))[2:]
 )


features = []
for trajectory in tc.trajectories:
 color = random_color()
 df = trajectory.df.copy()
 df["previous_geometry"] = df["geometry"].shift()
 df["time"] = df.index
 df["previous_time"] = df["time"].shift()
 for _, row in df.iloc[1:].iterrows():
 coordinates = [
 [row["previous_geometry"].xy[0][0], row["previous_geometry"].xy[1][0]],
 [row["geometry"].xy[0][0], row["geometry"].xy[1][0]],
 ]
 times = [row["previous_time"].isoformat(), row["time"].isoformat()]
 features.append(
 {
 "type": "Feature",
 "geometry": {
 "type": "LineString",
 "coordinates": coordinates,
 },
 "properties": {
 "times": times,
 "style": {
 "color": color,
 "weight": 5,
 },
 },
 }
 )

import folium
from folium.plugins import TimestampedGeoJson

m = folium.Map(location=[0, 0], zoom_start=16)

TimestampedGeoJson(
 {
 "type": "FeatureCollection",
 "features": features,
 },
 period="PT1S",
 add_last_point=True,
 transition_time=1000,
).add_to(m)

m