JSON Schema

We would like to pass around a SensorReading object as our message. To do this easily, let’s use a JSON Schema.

We first create a new object of type SensorReading. You may need to include a no input constructor as well as getter and setter methods for your fields.

public class SensorReading {
    public String sensor_name;
    public float value;

    public SensorReading() {
    }

    public SensorReading (String name, float input) {
        this.sensor_name = name;
        this.value = input;
    }

    public String getSensorName() {
        return this.sensor_name;
    }

    public void setSensorName(String name) {
        this.sensor_name = name;
    }

    public float getValue() {
        return this.value;
    }

    public void setValue(float input) { this.value = input; }
}

Create a producer that uses SensorReading JSON Schema:

Producer<SensorReading> producer = client.newProducer(Schema.JSON(SensorReading.class))
.topic("persistent://public/default/jsontopic")
.create();

Note we will need to use a different topic. The conversion of a topic from a String Schema to the SensorReading JSON schema is not allowed by default.

Pass a SensorReading object directly into the value() method during message creation:

for (int i = 0; i < 10; i++) {
     SensorReading myReading = new SensorReading("mysensor", i);
     MessageId msgID = producer.newMessage().value(myReading).send();
     System.out.println("Publish " + myReading.getSensorName() + " " + myReading.getValue() + " and message ID " + msgID);
}

The consumer is created using a SensorReading JSON Schema:

Consumer consumer = client.newConsumer(Schema.JSON(SensorReading.class))
.topic("persistent://public/default/jsontopic")
.subscriptionName("mysubscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

The message received by the consumer is type SensorReading.

for (int i = 0; i < 1000; i++) {
     Message<SensorReading> msg = consumer.receive();
     consumer.acknowledge(msg);
     SensorReading myReading = msg.getValue();
     System.out.println("Receive message " + myReading.getSensorName() + " " + myReading.getValue());
}

The schema can be viewed in the StreamNative UI.