AMPQ Sensor API
This API is an event driven message bus based on RabbitMQ.
RabbitMQ is open source message broker software that implements the Advanced Message Queuing Protocol (AMQP).
Read more about RabbitMQ.
Queues
To receive events, you must subscribe to an authenticated queue assigned to your sensors.
The credentials provided will grant you read only permissions. You won't be able to declare or bind queues, you will receive an ACCESS_REFUSED error if you try.
Once events are consumed from your queue by a single client, they will no longer be available.
Your queue works in a Competing Consumers Pattern. That means consumers will process events concurrently.
If your client stops working, only the most recent 1500 events will be available on your queue. Thus, it will not be possible to retrieve older events than the last 1500.
Connecting and Subscribing
Example of how to connect
Dependencies [pika](https://pika.readthedocs.io)
import pika
def callback(ch, method, properties, body):
print(" [x] %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
credentials = pika.PlainCredentials('user', 'password')
params = pika.ConnectionParameters(host='host', port=5672, virtual_host='/', heartbeat=60, credentials=credentials)
connection = pika.BlockingConnection(params)
channel = connection.channel()
channel.basic_qos(prefetch_count=10)
channel.basic_consume('queue-name', callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
connection.close()
Dependencies [amqp-client](https://mvnrepository.com/artifact/com.rabbitmq/amqp-client/5.8.0)
package com.voltaware.rabbitmqsimpleexample;
import java.io.IOException;
import java.time.Instant;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class App {
private final static String QUEUE_NAME = "queue-name";
private final static String RABBIT_HOST = "rabbit-host";
private final static String USERNAME = "user-name";
private final static String PASSWORD = "password";
public static void main(String[] argv)
throws java.io.IOException, java.lang.InterruptedException, TimeoutException {
final ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RABBIT_HOST);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
final String message = new String(body, "UTF-8");
System.out.println(Instant.now() + " [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
}
}
Dependencies [node-amqp](https://github.com/postwait/node-amqp)
var amqp = require('amqp');
var options = {
host: 'host',
login: 'username',
password: 'password',
}
var connection = amqp.createConnection(options);
// add this for better debugging
connection.on('error', function(error) {
console.log("Error from amqp: ", error);
});
// Wait for connection to become established.
connection.on('ready', function () {
connection.queue('queue', { passive: true }, function (queue) {
// Receive messages
queue.subscribe(function (message) {
console.log(message);
});
});
});
You need these parameters to connect to the RabbitMQ Voltaware broker.
- Username
- Password
- Queue name
- Broker URL
You will start receiving events when you subscribe to your queue. See code samples.
Messages
Message Format
{
"sensor": {
"id": 123,
"firmwareVersion": 41,
"coilReversed": "OFF",
"mainsFrequencyHz": 50.156,
"wifiSignalStrength": -26,
"channel": 3
},
"event": {
"type": "ON"|"OFF"|"KEEP_ALIVE",
"state": "INCOMPLETE",
"power": {
"activeW": 4402,
"activeDeltaW": -1410,
"reactiveVar": 2051,
"reactiveDeltaVar": -12,
"apparentVa": 4927
},
"currentA": 20.251,
"voltageV": 246.843,
"phaseDeg": 24.9,
"energyConsumptionWh": 58768,
"energyConsumptionDeltaWh": 5,
"transients": [23.651, 25.037, 25.197, 25.197, 25.037, 23.744, 23.611, 24.144, 23.917, 24.077],
"harmonics": {
"real": [25072, 1525, 2564, -86, 319, 62, 875, -66, 323],
"imaginary": [11915, -995, 1251, 45, 2172, -113, 1211, -27, 1011],
"modulusDelta": [8339, 1301, 157, 114, 48, 46, 11, 73, 81],
"voltage": [349324, 871, 2354, 138, 5981, 112, 1490, 55, 2329]
},
"timestamp": "2020-03-26T10:51:28Z",
"timestampDeltaSecs": 13,
"eventId": 3790301,
"protocolVersion": 2,
"activePowerProd": 0,
"energyProd": 0,
"energyProdDelta": 0,
"activePowerCons": 971,
"energyCons": 8065693,
"energyConsDelta": 4
}
}
Queues will publish three types of events:
event.type | Description |
---|---|
ON | Everytime the sensor detects an increase on consumption. Tipically means an appliance has been switched on. |
OFF | Everytime the sensor detects an decrease on consumption.Tipically means an appliance has been switched off. |
KEEP_ALIVE | When the sensor detects no change on consumption for more than 2 minutes. |
These attributes are optional and will only be present in the message if your commercial agreement allows it:
- transients
- harmonics