Ockam Routing and Transports enable higher level protocols that provide end-to-end guarantees to messages traveling across many network connection hops and protocols boundaries.
Ockam Routing is a simple and lightweight message-based protocol that makes it possible to bidirectionally exchange messages over a large variety of communication topologies.
Ockam Transports adapt Ockam Routing to various transport protocols like TCP, UDP, WebSockets, Bluetooth etc.
By layering Ockam Secure Channels and other higher level protocols over Ockam Routing, it is possible to build systems that provide end-to-end guarantees over arbitrary transport topologies that span many networks, connections, gateways, queues, and clouds.
Routing
Let's dive into how the routing protocol works. So far, in the section on Nodes and Workers, we've come across this simple message exchange:
Ockam Routing Protocol messages carry with them two metadata fields: an onward_route and a return_route. A route is an ordered list of addresses describing the path a message should travel. This information is carried with the message in compact binary form.
Pay close attention to the Sender, Hop, and Replier rules in the sequence diagrams below. Note how onward_route and return_route are handled as the message travels.
The above was one message hop. We may extend this to two hops:
This very simple protocol extends to any number of hops:
Routing over two hops
So far, we've created an "echoer" worker in our node, sent it a message, and received a reply. This worker was a simple one hop away from our "app" worker.
To achieve this, messages carry with them two metadata fields: onward_route and return_route, where a route is a list of addresses.
To get a sense of how that works, let's route a message over two hops.
Hop worker
For demonstration, we'll create a simple worker, called Hop, that takes every incoming message and forwards it to the next address in the onward_route of that message.
Just before forwarding the message, Hop's handle message function will:
Print the message
Remove its own address (first address) from the onward_route, by calling step()
Insert its own address as the first address in the return_route by calling prepend()
// src/hop.rsuse ockam::{Any, Context, Result, Routed, Worker};pubstructHop;#[ockam::worker]implWorkerforHop {typeContext=Context;typeMessage=Any;/// This handle function takes any incoming message and forwards/// it to the next hop in it's onward routeasyncfnhandle_message(&mut self, ctx:&mutContext, msg:Routed<Any>) ->Result<()> {println!("Address: {}, Received: {:?}", ctx.primary_address(), msg);// Send the message to the next worker on its onward_route ctx.forward(msg.into_local_message().step_forward(ctx.primary_address().clone())?).await }}
App worker
Next, let's create our main "app" worker.
In the code below we start an Echoer worker at address "echoer" and a Hop worker at address "h1". Then, we send a message along the h1 => echoer route by passing route!["h1", "echoer"] to send(..).
// examples/03-routing.rs// This node routes a message.use hello_ockam::{Echoer, Hop};use ockam::{node, route, Context, Result};#[ockam::node]asyncfnmain(ctx:Context) ->Result<()> {// Create a node with default implementationsletmut node =node(ctx).await?;// Start a worker, of type Echoer, at address "echoer" node.start_worker("echoer", Echoer)?;// Start a worker, of type Hop, at address "h1" node.start_worker("h1", Hop)?;// Send a message to the worker at address "echoer",// via the worker at address "h1" node.send(route!["h1", "echoer"], "Hello Ockam!".to_string()).await?;// Wait to receive a reply and print it.let reply = node.receive::<String>().await?;println!("App Received: {}", reply.into_body()?); // should print "Hello Ockam!"// Stop all workers, stop the node, cleanup and return. node.shutdown().await}
To run this new node program:
cargo run --example 03-routing
Routing over many hops
Similarly, we can also route the message via many hop workers:
// examples/03-routing-many-hops.rs// This node routes a message through many hops.use hello_ockam::{Echoer, Hop};use ockam::{node, route, Context, Result};#[ockam::node]asyncfnmain(ctx:Context) ->Result<()> {// Create a node with default implementationsletmut node =node(ctx).await?;// Start an Echoer worker at address "echoer" node.start_worker("echoer", Echoer)?;// Start 3 hop workers at addresses "h1", "h2" and "h3". node.start_worker("h1", Hop)?; node.start_worker("h2", Hop)?; node.start_worker("h3", Hop)?;// Send a message to the echoer worker via the "h1", "h2", and "h3" workerslet r =route!["h1", "h2", "h3", "echoer"]; node.send(r, "Hello Ockam!".to_string()).await?;// Wait to receive a reply and print it.let reply = node.receive::<String>().await?;println!("App Received: {}", reply.into_body()?); // should print "Hello Ockam!"// Stop all workers, stop the node, cleanup and return. node.shutdown().await}
To run this new node program:
cargo run --example 03-routing-many-hops
Transport
An Ockam Transport is a plugin for Ockam Routing. It moves Ockam Routing messages using a specific transport protocol like TCP, UDP, WebSockets, Bluetooth etc.
In previous examples, we routed messages locally within one node. Routing messages over transport layer connections looks very similar.
Let's try the TcpTransport, we'll need to create two nodes: a responder and an initiator.
Responder node
// examples/04-routing-over-transport-responder.rs// This node starts a tcp listener and an echoer worker.// It then runs forever waiting for messages.use hello_ockam::Echoer;use ockam::tcp::{TcpListenerOptions, TcpTransportExtension};use ockam::{node, Context, Result};#[ockam::node]asyncfnmain(ctx:Context) ->Result<()> {// Create a node with default implementationslet node =node(ctx).await?;// Initialize the TCP Transportlet tcp = node.create_tcp_transport()?;// Create an echoer worker node.start_worker("echoer", Echoer)?;// Create a TCP listener and wait for incoming connections.let listener = tcp.listen("127.0.0.1:4000", TcpListenerOptions::new()).await?;// Allow access to the Echoer via TCP connections from the TCP listener node.flow_controls().add_consumer(&"echoer".into(), listener.flow_control_id());// Don't call node.shutdown() here so this node runs forever.Ok(())}
Initiator node
// examples/04-routing-over-transport-initiator.rs// This node routes a message, to a worker on a different node, over the tcp transport.use ockam::tcp::{TcpConnectionOptions, TcpTransportExtension};use ockam::{node, route, Context, Result};#[ockam::node]asyncfnmain(ctx:Context) ->Result<()> {// Create a node with default implementationsletmut node =node(ctx).await?;// Initialize the TCP Transport.let tcp = node.create_tcp_transport()?;// Create a TCP connection to a different node.let connection_to_responder = tcp.connect("localhost:4000", TcpConnectionOptions::new()).await?;// Send a message to the "echoer" worker on a different node, over a tcp transport.// Wait to receive a reply and print it.let r =route![connection_to_responder, "echoer"];let reply = node.send_and_receive::<String>(r, "Hello Ockam!".to_string()).await?;println!("App Received: {}", reply); // should print "Hello Ockam!"// Stop all workers, stop the node, cleanup and return. node.shutdown().await}
Run
Run the responder in a separate terminal tab and keep it running:
cargo run --example 04-routing-over-transport-responder
Run the initiator:
cargo run --example 04-routing-over-transport-initiator
Bridge
A common real world topology is a transport bridge.
Node n1 wishes to access a service on node n3, but it can't directly connect to n3. This can happen for many reasons, maybe because n3 is in a separate IP subnet, or it could be that the communication from n1 to n2 uses UDP while from n2 to n3 uses TCP or other similar constraints. The topology makes n2 a bridge or gateway between these two separate networks.
We can setup this topology with Ockam Routing as follows:
Responder node
// examples/04-routing-over-transport-two-hops-responder.rs// This node starts a tcp listener and an echoer worker.// It then runs forever waiting for messages.use hello_ockam::Echoer;use ockam::tcp::{TcpListenerOptions, TcpTransportExtension};use ockam::{node, Context, Result};#[ockam::node]asyncfnmain(ctx:Context) ->Result<()> {// Create a node with default implementationslet node =node(ctx).await?;// Initialize the TCP Transportlet tcp = node.create_tcp_transport()?;// Create an echoer worker node.start_worker("echoer", Echoer)?;// Create a TCP listener and wait for incoming connections.let listener = tcp.listen("127.0.0.1:4000", TcpListenerOptions::new()).await?;// Allow access to the Echoer via TCP connections from the TCP listener node.flow_controls().add_consumer(&"echoer".into(), listener.flow_control_id());// Don't call node.shutdown() here so this node runs forever.Ok(())}
Middle node
Relay worker
We'll create a worker, called Relay, that takes every incoming message and forwards it to the predefined address.
// src/relay.rsuse ockam::{Any, Context, Result, Route, Routed, Worker};pubstructRelay { route:Route,}implRelay {pubfnnew(route:implInto<Route>) -> Self {let route = route.into();if route.is_empty() {panic!("Relay can't forward messages to an empty route"); } Self { route } }}#[ockam::worker]implWorkerforRelay {typeContext=Context;typeMessage=Any;/// This handle function takes any incoming message and forwards/// it to the next hop in it's onward routeasyncfnhandle_message(&mut self, ctx:&mutContext, msg:Routed<Any>) ->Result<()> {println!("Address: {}, Received: {:?}", ctx.primary_address(), msg);let next_on_route = self.route.next()?.clone();// Some type conversionletmut local_message = msg.into_local_message(); local_message = local_message.pop_front_onward_route()?; local_message = local_message.prepend_front_onward_route(self.route.clone()); // Prepend predefined route to the onward_routelet prev_hop = local_message.return_route().next()?.clone();ifletSome(info) = ctx.flow_controls().find_flow_control_with_producer_address(&next_on_route) { ctx.flow_controls().add_consumer(&prev_hop, info.flow_control_id()); }ifletSome(info) = ctx.flow_controls().find_flow_control_with_producer_address(&prev_hop) { ctx.flow_controls().add_consumer(&next_on_route, info.flow_control_id()); }// Send the message on its onward_route ctx.forward(local_message).await }}
// examples/04-routing-over-transport-two-hops-middle.rs// This node creates a tcp connection to a node at 127.0.0.1:4000// Starts a relay worker to forward messages to 127.0.0.1:4000// Starts a tcp listener at 127.0.0.1:3000// It then runs forever waiting to route messages.use hello_ockam::Relay;use ockam::tcp::{TcpConnectionOptions, TcpListenerOptions, TcpTransportExtension};use ockam::{node, Context, Result};#[ockam::node]asyncfnmain(ctx:Context) ->Result<()> {// Create a node with default implementationslet node =node(ctx).await?;// Initialize the TCP Transportlet tcp = node.create_tcp_transport()?;// Create a TCP connection to the responder node.let connection_to_responder = tcp.connect("127.0.0.1:4000", TcpConnectionOptions::new()).await?;// Create and start a Relay worker node.start_worker("forward_to_responder", Relay::new(connection_to_responder))?;// Create a TCP listener and wait for incoming connections.let listener = tcp.listen("127.0.0.1:3000", TcpListenerOptions::new()).await?;// Allow access to the Relay via TCP connections from the TCP listener node.flow_controls().add_consumer(&"forward_to_responder".into(), listener.flow_control_id());// Don't call node.shutdown() here so this node runs forever.Ok(())}
Initiator node
// examples/04-routing-over-transport-two-hops-initiator.rs// This node routes a message, to a worker on a different node, over two tcp transport hops.use ockam::tcp::{TcpConnectionOptions, TcpTransportExtension};use ockam::{node, route, Context, Result};#[ockam::node]asyncfnmain(ctx:Context) ->Result<()> {// Create a node with default implementationsletmut node =node(ctx).await?;// Initialize the TCP Transportlet tcp = node.create_tcp_transport()?;// Create a TCP connection to the middle node.let connection_to_middle_node = tcp.connect("localhost:3000", TcpConnectionOptions::new()).await?;// Send a message to the "echoer" worker, on a different node, over two tcp hops.// Wait to receive a reply and print it.let r =route![connection_to_middle_node, "forward_to_responder", "echoer"];let reply = node.send_and_receive::<String>(r, "Hello Ockam!".to_string()).await?;println!("App Received: {}", reply); // should print "Hello Ockam!"// Stop all workers, stop the node, cleanup and return. node.shutdown().await}
Run
Run the responder in a separate terminal tab and keep it running:
cargo run --example 04-routing-over-transport-two-hops-responder
Run the middle node in a separate terminal tab and keep it running:
cargo run --example 04-routing-over-transport-two-hops-middle
Run the initiator:
cargo run --example 04-routing-over-transport-two-hops-initiator
Relay
It is common, however, to encounter communication topologies where the machine that provides a service is unwilling or is not allowed to open a listening port or expose a bridge node to other networks. This is a common security best practice in enterprise environments, home networks, OT networks, and VPCs across clouds. Application developers may not have control over these choices from the infrastructure / operations layer. This is where relays are useful.
Relays make it possible to establish end-to-end protocols with services operating in a remote private network, without requiring a remote service to expose listening ports to an outside hostile network like the Internet.
Serialization
Ockam Routing messages when transported over the wire have the following structure. TransportMessage is serialized using BARE Encoding. We intend to transition to CBOR in the near future since we already use CBOR for other protocols built on top of Ockam Routing.
Each transport type has a conventional value. TCP has transport type 1. UDP has transport type 2 etc. Node local messages have transport type 0.
As message moves within a node it gathers additional metadata in structure like LocalMessage and RelayMessage that are used for a node's internal operation.
Access Control
Each Worker has one or more addresses that it uses to send and receive messages. We assign each Address an Incoming Access Control and an Outgoing Access Control.
#[async_trait]pubtraitIncomingAccessControl:Debug+Send+Sync+ 'static {/// Return true if the message is allowed to pass, and false if not.asyncfnis_authorized(&self, relay_msg:&RelayMessage) ->Result<bool>;}#[async_trait]pubtraitOutgoingAccessControl:Debug+Send+Sync+ 'static {/// Return true if the message is allowed to pass, and false if not.asyncfnis_authorized(&self, relay_msg:&RelayMessage) ->Result<bool>;}
Concrete instances of these traits inspect a message's onward_route, return_route, metadata etc. along with other node local state to decide if a message should be allowed to be sent or received. Incoming Access Control filters which messages reach an address while Outgoing Access Control decides which messages can be sent.
Flow Control
In our threat model, we assume that Workers within a Node are not malicious against each other. If programmed correctly they intend no harm.
However, there are certain types of Workers that forward messages that were created on other nodes. We don't implicitly trust other Ockam Nodes so messages from them can be dangerous. Such workers that can receive messages from another node are implemented with an Outgoing Access Control that denies all messages by default.
For example, a TCP Transport Listener spawns TCP Receivers for every new TCP connection. These receivers are implemented with an Outgoing Access Control that denies all messages, by default, from entering the node that is running the receiver. We can then explicitly allow messages to flow to a specific addresses.
In the middle node example above, we do this by explicitly allowing flow of messages from the TCP Receivers (spawned by TCP Transport Listener) to the forward_to_responder worker.
// Create a TCP listener and wait for incoming connections.let listener = tcp.listen("127.0.0.1:3000", TcpListenerOptions::new()).await?;// Allow access to the Forwarder via TCP connections from the TCP listenernode.flow_controls().add_consumer("forward_to_responder", listener.flow_control_id());