JSON Schema
We would like to pass around a SensorReading object as our message. To do this easily, let’s use a JSON Schema.
We first create a new object of type SensorReading. You may need to include a no input constructor as well as getter and setter methods for your fields.
public class SensorReading {
public String sensor_name;
public float value;
public SensorReading() {
}
public SensorReading (String name, float input) {
this.sensor_name = name;
this.value = input;
}
public String getSensorName() {
return this.sensor_name;
}
public void setSensorName(String name) {
this.sensor_name = name;
}
public float getValue() {
return this.value;
}
public void setValue(float input) { this.value = input; }
}
Create a producer that uses SensorReading JSON Schema:
Producer<SensorReading> producer = client.newProducer(Schema.JSON(SensorReading.class))
.topic("persistent://public/default/jsontopic")
.create();
Note we will need to use a different topic. The conversion of a topic from a String Schema to the SensorReading JSON schema is not allowed by default.
Pass a SensorReading object directly into the value() method during message creation:
for (int i = 0; i < 10; i++) {
SensorReading myReading = new SensorReading("mysensor", i);
MessageId msgID = producer.newMessage().value(myReading).send();
System.out.println("Publish " + myReading.getSensorName() + " " + myReading.getValue() + " and message ID " + msgID);
}
The consumer is created using a SensorReading JSON Schema:
Consumer consumer = client.newConsumer(Schema.JSON(SensorReading.class))
.topic("persistent://public/default/jsontopic")
.subscriptionName("mysubscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
The message received by the consumer is type SensorReading.
for (int i = 0; i < 1000; i++) {
Message<SensorReading> msg = consumer.receive();
consumer.acknowledge(msg);
SensorReading myReading = msg.getValue();
System.out.println("Receive message " + myReading.getSensorName() + " " + myReading.getValue());
}
The schema can be viewed in the StreamNative UI.
