
上QQ阅读APP看书,第一时间看更新
Sample code and frameworks for handling data
As we close this chapter, we will list the source code that we have used elsewhere in the book and summarize how you can connect to some of the data stores in order to persist the data. This will act as a reference for the reader, and can be used as easy access to the source code required for handling the IIoT data. Now, let's look at the following steps:
- Connecting to Liquibase using Spring Framework (from Chapter 2, IIoT Application Architecture and Design):
<persistence xmlns="http://java.sun.com/xml/ns/persistence"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" version="2.0"
xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd">
<persistence-unit name="iiotSamplePersistentUnit"
transaction-type="RESOURCE_LOCAL">
<description>Persistence Unit</description>
<provider>org.eclipse.persistence.jpa.PersistenceProvider</provider>
<mapping-file>META-INF/custom-orm.xml</mapping-file>
<exclude-unlisted-classes>false</exclude-unlisted-classes>
<properties>
<property name="eclipselink.logging.level" value="FINE"/>
<property name="eclipselink.logging.level.sql" value="INFO"/>
<property name="eclipselink.weaving" value="static"/>
<property name="eclipselink.profiler" value="QueryMonitor"/> <!-- probably disable this in prod -->
<property name="eclipselink.jdbc.native-sql" value="true"/>
<property name="javax.persistence.query.timeout" value="10000"/>
</properties>
</persistence-unit>
</persistence>
- Liquibase configuration and DDL scripts are given next; we are reading from an env property file and Spring automatically wires these properties to the appropriate bean. We will use the env property only when running locally; in a cloud deployment, we will use manifest.yml to bind to appropriate PostgreSQL instances, but the code does not need to change:
@Configuration
public class LiquibaseConfiguration {
private final DataSource dataSource;
@Value("${spring.datasource.driver-class-name:org.postgresql.Driver}")
private String dataSourceDriverClassName;
@Value("${vcap.services.${iiot_sample_postgres_name:iiot-sample-postgres}.credentials.uri}")
private String dataSourceUrl;
@Value("${vcap.services.${iiot_sample_postgres_name:iiot-sample-postgres}.credentials.username}")
private String dataSourceUsername;
@Value("${vcap.services.${iiot_sample_postgres_name:iiot-sample-postgres}.credentials.password}")
private String dataSourcePassword;
@Autowired
public LiquibaseConfiguration(DataSource dataSource){
this.dataSource = dataSource;
}
@Bean
public SpringLiquibase liquibase(TenantDataSourceConfig tenantDataSourceConfig) {
SmarterSpringLiquibase liquibase = new SmarterSpringLiquibase(tenantDataSourceConfig);
liquibase.setChangeLog("classpath:db/changelog.xml");
liquibase.setDataSource(dataSource);
liquibase.setDefaultSchema("iiot-sample");
liquibase.setDropFirst(false);
liquibase.setShouldRun(true);
return liquibase;
}
}
- DDL Scripts for Liquibase; all the DB scripts are saved in the resources/db directory:
CREATE TABLE alerts
(
id bigserial NOT NULL,
alerts_uuid text NOT NULL,
severity integer,
alert_name text,
alert_info text,
created_by text NOT NULL,
created_date timestamp with time zone NOT NULL DEFAULT now(),
updated_by text,
updated_date timestamp with time zone,
tenant_uuid text NOT NULL,
CONSTRAINT alerts_pkey PRIMARY KEY (id)
);
CREATE UNIQUE INDEX ALERTS_TENANT_IDX ON Alerts(alerts_uuid, tenant_uuid);
- From Chapter 3, IIoT Edge Development. Connecting to Postgres using Node.js, create the ./receiver/index.js file with the following content, replacing the database credentials with the correct values:
var http = require('http');
var querystring = require('querystring');
var Pool = require('pg').Pool;
var pool = new Pool({
user: 'user',
database: 'database',
password: 'password',
host: 'host',
port: 5432
});
//ensure table exists in db
pool.query('CREATE TABLE IF NOT EXISTS "sensor-logs" (id serial NOT NULL PRIMARY KEY, data json NOT NULL)', function (err, result) {
if (err) console.log(err);
});
http.createServer(function (req, res) {
req.on('data', function (chunk) {
var data = querystring.parse(chunk.toString());
console.log(data);
//save in db
pool.query('INSERT INTO "sensor-logs" (data) VALUES ($1)', [data], function (err, result) {
if (err) console.log(err);
});
});
req.on('end', function () {
res.writeHead(200, 'OK', {'Content-Type': 'text/html'});
res.end('ok')
});
}).listen(process.env.PORT || 8080);
- Covered in Chapter 6, Developing Your First Application for IIoT. Connecting to and using InfluxDB:
var restify = require('restify');
var server = restify.createServer({name: 'MyApp'});
var Influx = require('influx');
var alertEmail = require('./alert-email');
var slack = require('slack-incoming-webhook')({url: '{webhook url}', });
var min = 0;
var max = 100;
var alertFrom = 90;
var interval = 1000;
var alertSlack = slack({
url: 'https://hooks.slack.com/services/xxx/yyy/zzz',
channel: '#channel-name',
icon: ':warning:'
});
// create connection
const influx = new Influx.InfluxDB({
host: 'localhost',
database: 'mydb',
schema: [
{
measurement: 'sensor1',
fields: {
variable1: Influx.FieldType.INTEGER
},
tags: [
'device'
]
}
]
});
// simulate receiving data from sensors
setInterval(function () {
var variable1 = Math.floor(max - Math.random() * (max - min));
if (variable1 >= alertFrom) {
var msg = 'variable exceeded ' + variable1 + ' rpm';
alertEmail(msg);
alertSlack(msg);
}
influx.writePoints([
{
measurement: 'sensor1',
tags: {device: 'raspberry'},
fields: {variable1: variable1}
}
]).then(function () {
console.log(variable1);
}).catch(function (err) {
console.error(err);
});
}, interval);
server.use(function search(req, res, next) {
res.header('Access-Control-Allow-Origin', '*');
res.header('Access-Control-Allow-Headers', 'X-Requested-With');
return next();
});
server.get('/last', function search(req, res, next) {
influx.query('select * from sensor1 WHERE time > now() - 5m').then(function (result) {
res.json(result)
}).catch(function (err) {
res.status(500).send(err.stack)
});
return next();
});
server.get('/stats', function search(req, res, next) {
influx.query('SELECT COUNT(*), MEAN(*), MEDIAN(*), MODE(*), SPREAD(*), STDDEV(*), SUM(*), FIRST(*), LAST(*), MAX(*), MIN(*), PERCENTILE(*, 5) FROM "sensor1" WHERE time > now() - 1h').then(function (result) {
res.json(result)
}).catch(function (err) {
res.status(500).send(err.stack)
});
return next();
});
server.get(/\/public\/?.*/, restify.plugins.serveStatic({
directory: __dirname
}));
server.listen(8080);