Publish-Subscribe (Pub-Sub) Design Pattern

Publish-Subscribe (Pub-Sub) Design Pattern

What is topic based Publish-Subscribe (Pub-Sub) design pattern?

Publishers are the entities who create/publish a message on a topic. Subscribers are the entities who subscribe to messages on a topic.

In a topic based Publish-Subscribe pattern, Publishers tag each message with the a topic instead of referencing specific Subscribers. Messaging system then sends the message to all Subscribers who have asked to receive messages on that topic.

Publishers only concern themselves with creating the original message and can leave the task of servicing the Subscribers to the messaging infrastructure (this is where pattern comes into picture).

Design of the Publish-Subscribe pattern:

Publish-Subscribe Design Pattern
Publish-Subscribe Design Pattern

There are three components in Pub-Sub pattern: Publisher, Subscriber and PubSub Service.

  1. Message Publisher sends messages to the PubSub Service without any knowledge of message subscribers
  2. Message Subscribers will only get the messages for which topics they are registered with the PubSubService. E.g. say we have three different topics (message types): a, b, c but only topic a is of interest to Subscriber 1, topics b and c are of interest to Subscriber 2 and Subscriber 3 wants messages for topics a and c. So subscriber 1 will be notified for topic a, Subscriber 2 will be notified for topics b and c, and Subscriber 3 will be notified for topics a and c by the PubSub Service.
  3. Publishers tag each message with a topic, and send it to the PubSubService which acts like a middleman between Publishers and receivers.
  4. Subscriber registers itself with PubSubService (middleman) and tells that it’s interested in messages related to a particular topic.
  5. Publisher-Subscriber pattern is highly loosely coupled architecture where the publishers don’t know who the subscribers are and subscribers don’t know who the publishers of topic are.
  6. While Pub-Sub pattern is very similar to Observer pattern there are 2 main differences:
    1. In Observer pattern, the observers are aware of the observables. In Publisher-Subscriber pattern the publishers and subscribers don’t need to know each other.
    2. Observer pattern is mostly implemented in a synchronous way i.e. the observable calls the appropriate method of all its observers when some event occurs. The Publisher-Subscriber pattern is mostly implemented in an asynchronous way (using message queue).

Implementation details:

  • Project Directory Structure in Eclipse.
Project directory structure
Project directory structure
  • Publisher interface defines the abstract method publish() which sends message to PubSub Service.
package pubsub.publisher;
import pubsub.Message;
import pubsub.service.PubSubService;

public interface Publisher {	
	//Publishes new message to PubSubService
	void publish(Message message, PubSubService pubSubService);
}
  • PublisherImpl class implements Publisher interface and implements publish method, which sends the message to PubSubService.
package pubsub.publisher;
import pubsub.Message;
import pubsub.service.PubSubService;

public class PublisherImpl implements Publisher {
	//Publishes new message to PubSubService
	public void publish(Message message, PubSubService pubSubService) {		
		pubSubService.addMessageToQueue(message);
	}
}
  • Subscriber is an abstract class which has below:
    • addSubscriber() – Adds/Registers subscriber for a topic with PubSub service.
    • unSubscribe() – Removes/Unsubscribes the subscriber for a topic with PubSub
    • List<Message> subscriberMessages – List of message which stores the messages received by Subscriber.
    • getMessagesForSubscriberOfTopic() – Method which requests for messages for subscriber of topic.
package pubsub.subscriber;

import java.util.ArrayList;
import java.util.List;
import pubsub.Message;
import pubsub.service.PubSubService;

public abstract class Subscriber {	
	//store all messages received by the subscriber
	private List<Message> subscriberMessages = new ArrayList<Message>();
	
	public List<Message> getSubscriberMessages() {
		return subscriberMessages;
	}
	public void setSubscriberMessages(List<Message> subscriberMessages) {
		this.subscriberMessages = subscriberMessages;
	}
	
	//Add subscriber with PubSubService for a topic
	public abstract void addSubscriber(String topic, PubSubService pubSubService);
	
	//Unsubscribe subscriber with PubSubService for a topic
	public abstract void unSubscribe(String topic, PubSubService pubSubService);
	
	//Request specifically for messages related to topic from PubSubService
	public abstract void getMessagesForSubscriberOfTopic(String topic, PubSubService pubSubService);
	
	//Print all messages received by the subscriber 
	public void printMessages(){
		for(Message message : subscriberMessages){
			System.out.println("Message Topic -> "+ message.getTopic() + " : " + message.getPayload());
		}
	}
}
  • SubscriberImpl class extends Subscriber and implements the abstract methods mentioned above.
package pubsub.subscriber;
import pubsub.service.PubSubService;

public class SubscriberImpl extends Subscriber{
	//Add subscriber with PubSubService for a topic
	public void addSubscriber(String topic, PubSubService pubSubService){
		pubSubService.addSubscriber(topic, this);
	}
	
	//Unsubscribe subscriber with PubSubService for a topic
	public void unSubscribe(String topic, PubSubService pubSubService){
		pubSubService.removeSubscriber(topic, this);
	}

	//Request specifically for messages related to topic from PubSubService
	public void getMessagesForSubscriberOfTopic(String topic, PubSubService pubSubService) {
		pubSubService.getMessagesForSubscriberOfTopic(topic, this);
		
	}	
}
  • Message class is a simple POJO class to represent the messages. It has topic attribute (for this subscriber is interested) and an attribute for message payload.
package pubsub;

public class Message {	
	private String topic;
	private String payload;
	
	public Message(){}	
	public Message(String topic, String payload) {
		this.topic = topic;
		this.payload = payload;
	}
	public String getTopic() {
		return topic;
	}
	public void setTopic(String topic) {
		this.topic = topic;
	}
	public String getPayload() {
		return payload;
	}
	public void setPayload(String payload) {
		this.payload = payload;
	}
}
  • PubSubService is the main class which has below:
    • Map<String, Set<Subscriber>> subscribersTopicMap – Stores the subscribers interested in a topic.
    • Queue<Message> messageQueue – Stores the messages published by publishers.
    • addMessageToQueue() – Adds message published by publishers to message queue.
    • addSubscriber () – Adds a subscriber for a topic.
    • removeSubscriber() – removes a subscriber for a topic.
    • broadcast() – Broadcast new messages added in queue to all subscribers of the topic. The messagesQueue will be empty after broadcasting is complete.
    • getMessagesForSubscriberOfTopic() – Sends messages about a topic for subscriber at any point.
package pubsub.service;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import pubsub.Message;
import pubsub.subscriber.Subscriber;

public class PubSubService {
	//Keeps set of subscriber topic wise, using set to prevent duplicates 
	Map<String, Set<Subscriber>> subscribersTopicMap = new HashMap<String, Set<Subscriber>>();

	//Holds messages published by publishers
	Queue<Message> messagesQueue = new LinkedList<Message>();

	//Adds message sent by publisher to queue
	public void addMessageToQueue(Message message){
		messagesQueue.add(message);
	}

	//Add a new Subscriber for a topic
	public void addSubscriber(String topic, Subscriber subscriber){

		if(subscribersTopicMap.containsKey(topic)){
			Set<Subscriber> subscribers = subscribersTopicMap.get(topic);
			subscribers.add(subscriber);
			subscribersTopicMap.put(topic, subscribers);
		}else{
			Set<Subscriber> subscribers = new HashSet<Subscriber>();
			subscribers.add(subscriber);
			subscribersTopicMap.put(topic, subscribers);
		}		
	}

	//Remove an existing subscriber for a topic
	public void removeSubscriber(String topic, Subscriber subscriber){

		if(subscribersTopicMap.containsKey(topic)){
			Set<Subscriber> subscribers = subscribersTopicMap.get(topic);
			subscribers.remove(subscriber);
			subscribersTopicMap.put(topic, subscribers);
		}
	}

	//Broadcast new messages added in queue to All subscribers of the topic. messagesQueue will be empty after broadcasting
	public void broadcast(){
		if(messagesQueue.isEmpty()){
			System.out.println("No messages from publishers to display");
		}else{
			while(!messagesQueue.isEmpty()){
				Message message = messagesQueue.remove();
				String topic = message.getTopic();

				Set<Subscriber> subscribersOfTopic = subscribersTopicMap.get(topic);

				for(Subscriber subscriber : subscribersOfTopic){
					//add broadcasted message to subscribers message queue
					List<Message> subscriberMessages = subscriber.getSubscriberMessages();
					subscriberMessages.add(message);
					subscriber.setSubscriberMessages(subscriberMessages);
				}			
			}
		}
	}

	//Sends messages about a topic for subscriber at any point
	public void getMessagesForSubscriberOfTopic(String topic, Subscriber subscriber) {
		if(messagesQueue.isEmpty()){
			System.out.println("No messages from publishers to display");
		}else{
			while(!messagesQueue.isEmpty()){
				Message message = messagesQueue.remove();

				if(message.getTopic().equalsIgnoreCase(topic)){

					Set<Subscriber> subscribersOfTopic = subscribersTopicMap.get(topic);

					for(Subscriber _subscriber : subscribersOfTopic){
						if(_subscriber.equals(subscriber)){
							//add broadcasted message to subscriber message queue
							List<Message> subscriberMessages = subscriber.getSubscriberMessages();
							subscriberMessages.add(message);
							subscriber.setSubscriberMessages(subscriberMessages);
						}
					}
				}
			}
		}
	}

}
  • DriverClass is the main class to run and test the Publisher-Subscriber design pattern. This is just for demo purpose and in real system we will have publishers and subscribers doing the same via API calls etc.
package pubsub;

import pubsub.publisher.Publisher;
import pubsub.publisher.PublisherImpl;
import pubsub.service.PubSubService;
import pubsub.subscriber.Subscriber;
import pubsub.subscriber.SubscriberImpl;

public class DriverClass {
	public static void main(String[] args) {
		
		//Instantiate publishers, subscribers and PubSubService 
		Publisher javaPublisher = new PublisherImpl();
		Publisher pythonPublisher = new PublisherImpl();
		
		Subscriber javaSubscriber = new SubscriberImpl();
		Subscriber allLanguagesSubscriber = new SubscriberImpl();
		Subscriber pythonSubscriber = new SubscriberImpl();
		
		PubSubService pubSubService = new PubSubService();
		
		//Declare Messages and Publish Messages to PubSubService
		Message javaMsg1 = new Message("Java", "Core Java Concepts");
		Message javaMsg2 = new Message("Java", "Spring MVC : Dependency Injection and AOP");
		Message javaMsg3 = new Message("Java", "JPA & Hibernate");
		
		javaPublisher.publish(javaMsg1, pubSubService);
		javaPublisher.publish(javaMsg2, pubSubService);
		javaPublisher.publish(javaMsg3, pubSubService);
		
		Message pythonMsg1 = new Message("Python", "Easy and Powerful programming language");
		Message pythonMsg2 = new Message("Python", "Advanced Python message");
		
		pythonPublisher.publish(pythonMsg1, pubSubService);
		pythonPublisher.publish(pythonMsg2, pubSubService);
		
		//Declare Subscribers 
		javaSubscriber.addSubscriber("Java",pubSubService);		//Java subscriber only subscribes to Java topics
		pythonSubscriber.addSubscriber("Python",pubSubService);   //Python subscriber only subscribes to Python topics
		allLanguagesSubscriber.addSubscriber("Java", pubSubService);	//all subscriber, subscribes to both Java and Python
		allLanguagesSubscriber.addSubscriber("Python", pubSubService);
		
		//Trying unSubscribing a subscriber
		//pythonSubscriber.unSubscribe("Python", pubSubService);
		
		//Broadcast message to all subscribers. After broadcast, messageQueue will be empty in PubSubService
		pubSubService.broadcast();
		
		//Print messages of each subscriber to see which messages they got
		System.out.println("Messages of Java Subscriber are: ");
		javaSubscriber.printMessages();
		
		System.out.println("\nMessages of Python Subscriber are: ");
		pythonSubscriber.printMessages();
		
		System.out.println("\nMessages of All Languages Subscriber are: ");
		allLanguagesSubscriber.printMessages();
		
		//After broadcast the messagesQueue will be empty, so publishing new messages to server
		System.out.println("\nPublishing 2 more Java Messages...");
		Message javaMsg4 = new Message("Java", "JSP and Servlets");
		Message javaMsg5 = new Message("Java", "Struts framework");
		
		javaPublisher.publish(javaMsg4, pubSubService);
		javaPublisher.publish(javaMsg5, pubSubService);
		
		javaSubscriber.getMessagesForSubscriberOfTopic("Java", pubSubService);
		System.out.println("\nMessages of Java Subscriber now are: ");
		javaSubscriber.printMessages();		
	}
}

Output:

Messages of Java Subscriber are: 
Message Topic -> Java : Core Java Concepts
Message Topic -> Java : Spring MVC : Dependency Injection and AOP
Message Topic -> Java : JPA & Hibernate

Messages of Python Subscriber are: 
Message Topic -> Python : Easy and Powerful programming language
Message Topic -> Python : Advanced Python message

Messages of All Languages Subscriber are: 
Message Topic -> Java : Core Java Concepts
Message Topic -> Java : Spring MVC : Dependency Injection and AOP
Message Topic -> Java : JPA & Hibernate
Message Topic -> Python : Easy and Powerful programming language
Message Topic -> Python : Advanced Python message

Publishing 2 more Java Messages...

Messages of Java Subscriber now are: 
Message Topic -> Java : Core Java Concepts
Message Topic -> Java : Spring MVC : Dependency Injection and AOP
Message Topic -> Java : JPA & Hibernate
Message Topic -> Java : JSP and Servlets
Message Topic -> Java : Struts framework

Thus we saw the design and implementation details of Publish-Subscribe Design Pattern.

Leave a Reply

Your email address will not be published. Required fields are marked *