AVRO Schema

Continuing to use the SensorReading object, let’s use a SensorReading AVRO Schema with our producer and consumer.

Create a producer that uses SensorReading AVRO Schema:

Producer<SensorReading> producer = client.newProducer(AvroSchema.of(SensorReading.class))
     .topic("persistent://public/default/avrotopic")
     .create();

Pass a SensorReading object directly into the value() method during message creation:

for (int i = 0; i < 10; i++) {
     SensorReading myReading = new SensorReading("mysensor", i);
     MessageId msgID = producer.newMessage().value(myReading).send();
     System.out.println("Publish " + myReading.getSensorName() + " " + myReading.getValue() + " and message ID " + msgID);
}

The consumer is created using a SensorReading AVRO Schema:

Consumer consumer = client.newConsumer(AvroSchema.of(SensorReading.class))
.topic("persistent://public/default/avrotopic")
.subscriptionName("mysubscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

The message received by the consumer is type SensorReading.

for (int i = 0; i < 1000; i++) {
     Message<SensorReading> msg = consumer.receive();
     consumer.acknowledge(msg);
     SensorReading myReading = msg.getValue();
     System.out.println("Receive message " + myReading.getSensorName() + " " + myReading.getValue());
}

The schema can be viewed in the StreamNative UI.