Shared Subscription

Next, let’s test out the Shared Subscription type. We will start multiple consumers using the same subscription on the same topic and observe that multiple consumers will receive messages.

The Shared Subscription is specified in the consumer configuration. The consumer has a receiver queue with a default size of 1000. This may result in one of your consumers fetching a large number of messages and holding them in its receiver queue while the other consumers remain idle. Here we reduce the receiver queue size to one to keep the load balanced. In practice, having a receiver queue size this small may introduce inefficiencies.

Consumer consumer = client.newConsumer()
.topic("persistent://public/default/mynewtopic")
.subscriptionName("mysubscription")
.subscriptionType(SubscriptionType.Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
//default receiverQueueSize is 1000, reduce to 1 for testing only to watch Shared Subscription distribute load
.receiverQueueSize(1)
.subscribe();

We used the following to simulate work for message processing:

import java.util.Random;

Random random = new Random();

//random wait to simulate work when demonstrating Shared subscription with small receive queue size
System.out.println("Received messaged " + new String(msg.getData()));
Thread.sleep(random.nextInt(10000));

There are no message ordering guarantees with a Shared Subscription.

We can see the updated subscription type in the StreamNative UI.