Simple IPC Using Named Pipes

If you’re reading this article, then you’re probably familiar with Unix pipes. Perhaps less familiar to many people are named pipes a.k.a. fifo pipes (first-in-first-out). Unlike conventional pipes, named pipes are represented on the filesystem, and can be created using the mkfifo command in the terminal. See this screencast for an example:

A common use case for named pipes is simple and lightweight interprocess communication (IPC). I often use named pipes as a message queue, where a “writer” process sends messages into a named pipe, which are taken off at the other side by a “reader” process asynchronously.

Illustration of IPC using a named pipe

In this article, we will implement a simple IPC mechanism over a named pipe between two Python processes.

  1. Message format
  2. Writer process
  3. Reader proces
  4. Demonstration
  5. Going further

Message format

Named pipes don’t have an in-built mechanism for marking the end of a message, so we need to define the message format ourselves.

The format of each message

One way of doing this is to split the message payload into two parts — the size of the message and the content of the message. The first part will always be exactly 4 bytes, encoded as an unsigned little-endian integer, representing the byte-size of the message content. When reading from the pipe, we read these initial 4 bytes which tell us how many more bytes to read until the end of the message. Let’s create a Python file called message.py to implement the message format.

# message.py
import struct

def encode_msg_size(size: int) -> bytes:
    return struct.pack("<I", size)

def decode_msg_size(size_bytes: bytes) -> int:
    return struct.unpack("<I", size_bytes)[0]

def create_msg(content: bytes) -> bytes:
    size = len(content)
    return encode_msg_size(size) + content
    
if __name__ == "__main__":
    print(encode_msg_size(12)) #=> b'\x0c\x00\x00\x00'
    print(decode_msg_size(b'\x0c\x00\x00\x00')) #=> 12

We use the standard library package struct for the encoding and decoding. Here <means little-endian and I means 4 byte unsigned integer.

Writer process

For the purposes of demonstration, let’s keep the writer process simple — read a name from inputand send a greeting to the named pipe.

#!/usr/local/bin/python3
# writer.py
import os
from message import create_msg

if __name__ == "__main__":
    IPC_FIFO_NAME = "hello_ipc"

    fifo = os.open(IPC_FIFO_NAME, os.O_WRONLY)
    try:
        while True:
            name = input("Enter a name: ")
            content = f"Hello {name}!".encode("utf8")
            msg = create_msg(content)
            os.write(fifo, msg)
    except KeyboardInterrupt:
        print("\nGoodbye!")
    finally:
        os.close(fifo)

Reader process

The reader process polls the named pipe for new messages. When it encounters a message, we just print it to the screen.

#!/usr/local/bin/python3
# reader.py
import os
import select
from message import decode_msg_size

def get_message(fifo: int) -> str:
    """Get a message from the named pipe."""
    msg_size_bytes = os.read(fifo, 4)
    msg_size = decode_msg_size(msg_size_bytes)
    msg_content = os.read(fifo, msg_size).decode("utf8")
    return msg_content


if __name__ == "__main__":
    # Make the named pipe and poll for new messages.
    IPC_FIFO_NAME = "hello_ipc"
    os.mkfifo(IPC_FIFO_NAME)
    try:
        # Open the pipe in non-blocking mode for reading
        fifo = os.open(IPC_FIFO_NAME, os.O_RDONLY | os.O_NONBLOCK)
        try:
            # Create a polling object to monitor the pipe for new data
            poll = select.poll()
            poll.register(fifo, select.POLLIN)
            try:
                while True:
                    # Check if there's data to read. Timeout after 2 sec.
                    if (fifo, select.POLLIN) in poll.poll(2000):
		    	# Do something with the message
                        msg = get_message(fifo)
                        print(msg)
                    else:
		    	# No data, do something else
                        print("Nobody here :(")
            finally:
                poll.unregister(fifo)
        finally:
            os.close(fifo)
    finally:
    	# Delete the named pipe when the reader terminates
        os.remove(IPC_FIFO_NAME)

There are a few important points of note here. First, we open the named pipe in non-blocking mode by specifying the bitmask os.O_RDONLY | os.O_NONBLOCK. Without adding os.O_NONBLOCK, the process will block until another process writes data to the named pipe. Depending on your use-case, blocking may be fine, but non-blocking mode is required if we want to operate in a “polling” regime. The read Linux man-page has more information on blocking vs. non-blocking reads.

While we could manually check for new data on the pipe, it’s more efficient to have the operating system tell us when new data arrives. The select module in the standard library exposes several mechanisms for monitoring files. I’ve chosen to use poll which works on Unix systems. After creating the poll object, we use its register method to instruct it to monitor the named pipe for new data with the select.POLLIN flag.

Demonstration

Going further

Named pipes are a quick and lightweight way to achieve IPC between processes. The example in this article was a simple case of IPC – a single writer process communicating over a uni-directional channel with a single reader process. While it’s possible to extend named named pipes to more complex scenarios — duplex communication or multiple readers and writers — there are more appropriate tools for the job such as Unix domain sockets, or higher-level abstractions such ZeroMQ and Nanomsg. Unlike named pipes, which only exist as file descriptors on a single host, these tools have the capibility to communicate over a network. Moving another step up the ladder of abstraction, we have dedicated message queues such as RabbitMQ and SQS, and heavy-duty distributed streaming platforms such as Apache Kafka.