NAV Navbar
python java javascript
  • ◀ Back to Doc's Home
  • AMPQ Sensor API
  • Queues
  • Connecting and Subscribing
  • Messages
  • AMPQ Sensor API

    This API is an event driven message bus based on RabbitMQ.

    RabbitMQ is open source message broker software that implements the Advanced Message Queuing Protocol (AMQP).

    Read more about RabbitMQ.

    Queues

    To receive events, you must subscribe to an authenticated queue assigned to your sensors.

    The credentials provided will grant you read only permissions. You won't be able to declare or bind queues, you will receive an ACCESS_REFUSED error if you try.

    Once events are consumed from your queue by a single client, they will no longer be available.

    Your queue works in a Competing Consumers Pattern. That means consumers will process events concurrently.

    If your client stops working, only the most recent 1500 events will be available on your queue. Thus, it will not be possible to retrieve older events than the last 1500.

    Connecting and Subscribing

    Example of how to connect

    Dependencies [pika](https://pika.readthedocs.io)
    
    import pika
    
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    credentials = pika.PlainCredentials('user', 'password')
    params = pika.ConnectionParameters(host='host', port=5672, virtual_host='/', heartbeat=60, credentials=credentials)
    connection = pika.BlockingConnection(params)
    
    channel = connection.channel()
    channel.basic_qos(prefetch_count=10)
    channel.basic_consume('queue-name', callback)
    print(' [*] Waiting for messages. To exit press CTRL+C')
    try:
        channel.start_consuming()
    except KeyboardInterrupt:
        channel.stop_consuming()
        connection.close()
    
    Dependencies [amqp-client](https://mvnrepository.com/artifact/com.rabbitmq/amqp-client/5.8.0)
    
    package com.voltaware.rabbitmqsimpleexample;
    
    import java.io.IOException;
    import java.time.Instant;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    public class App {
    
        private final static String QUEUE_NAME = "queue-name";
        private final static String RABBIT_HOST = "rabbit-host";
        private final static String USERNAME = "user-name";
        private final static String PASSWORD = "password";
    
        public static void main(String[] argv)
            throws java.io.IOException, java.lang.InterruptedException, TimeoutException {
    
            final ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(RABBIT_HOST);
            factory.setUsername(USERNAME);
            factory.setPassword(PASSWORD);
    
            final Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
    
            final Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                        byte[] body) throws IOException {
                    final String message = new String(body, "UTF-8");
                    System.out.println(Instant.now() + " [x] Received '" + message + "'");
                }
            };
            channel.basicConsume(QUEUE_NAME, true, consumer);
    
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        }
    }
    
    Dependencies [node-amqp](https://github.com/postwait/node-amqp)
    
    var amqp = require('amqp');
    
    var options = {
      host: 'host',
      login: 'username',
      password: 'password',
    }
    
    var connection = amqp.createConnection(options);
    
    // add this for better debugging
    connection.on('error', function(error) {
      console.log("Error from amqp: ", error);
    });
    
    // Wait for connection to become established.
    connection.on('ready', function () {
      connection.queue('queue', { passive: true }, function (queue) {
        // Receive messages
        queue.subscribe(function (message) {
          console.log(message);
        });
      });
    });
    

    You need these parameters to connect to the RabbitMQ Voltaware broker.

    You will start receiving events when you subscribe to your queue. See code samples.

    Messages

    Message Format

    {
        "sensor": {
            "id": 123,
            "firmwareVersion": 41,
            "coilReversed": "OFF",
            "mainsFrequencyHz": 50.156,
            "wifiSignalStrength": -26,
            "channel": 3
        },
        "event": {
            "type": "ON"|"OFF"|"KEEP_ALIVE",
            "state": "INCOMPLETE",
            "power": {
                "activeW": 4402,
                "activeDeltaW": -1410,
                "reactiveVar": 2051,
                "reactiveDeltaVar": -12,
                "apparentVa": 4927
            },
            "currentA": 20.251,
            "voltageV": 246.843,
            "phaseDeg": 24.9,
            "energyConsumptionWh": 58768,
            "energyConsumptionDeltaWh": 5,
            "transients": [23.651, 25.037, 25.197, 25.197, 25.037, 23.744, 23.611, 24.144, 23.917, 24.077],
            "harmonics": {
                "real": [25072, 1525, 2564, -86, 319, 62, 875, -66, 323],
                "imaginary": [11915, -995, 1251, 45, 2172, -113, 1211, -27, 1011],
                "modulusDelta": [8339, 1301, 157, 114, 48, 46, 11, 73, 81],
                "voltage": [349324, 871, 2354, 138, 5981, 112, 1490, 55, 2329]
            },
            "timestamp": "2020-03-26T10:51:28Z",
            "timestampDeltaSecs": 13,
            "eventId": 3790301,
            "protocolVersion": 2,
            "activePowerProd": 0,
            "energyProd": 0,
            "energyProdDelta": 0,
            "activePowerCons": 971,
            "energyCons": 8065693,
            "energyConsDelta": 4
        }
    }
    

    Queues will publish three types of events:

    event.type Description
    ON Everytime the sensor detects an increase on consumption. Tipically means an appliance has been switched on.
    OFF Everytime the sensor detects an decrease on consumption.Tipically means an appliance has been switched off.
    KEEP_ALIVE When the sensor detects no change on consumption for more than 2 minutes.

    These attributes are optional and will only be present in the message if your commercial agreement allows it: