分散式專案(五)iot-pgsql
阿新 • • 發佈:2019-04-21
書接上回,在Mapping server中,我們已經把資料都整理好了,現在利用postgresql儲存歷史資料。
iot-pgsql
構建iot-pgsql模組,這裡我們寫資料庫為了效能考慮不在使用mybatis,換成spring jdbc批處理寫資料庫。
引入spring jdbc依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> <version>2.1.4.RELEASE</version> </dependency>
因為資料來此訂閱的kafka資料,所以還需要引入kafka依賴,這裡已經kakfa隔離成了一個獨立的模組,所以加入kakfa模組就行了。
<dependency>
<groupId>cn.le</groupId>
<artifactId>iot-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
kakfa監聽
[@Component](https://my.oschina.net/u/3907912) public class PgListener { @Autowired private PgService pgService; private static List<KafkaDownVO> insertList = new LinkedList<KafkaDownVO>(); @KafkaListener(topics = DOWN_TOPIC) public void pgListener(String msg){ System.out.println("-----------" + msg); List<KafkaDownVO> downVOList = JSONArray.parseArray(msg,KafkaDownVO.class); //批量寫入資料庫 insertList.addAll(downVOList); if (insertList.size() > 1000){ jdbcBachInsert(insertList); } } public void jdbcBachInsert(List<KafkaDownVO> downVOList){ if (CollectionUtils.isEmpty(downVOList)){ return; } pgService.jdbcBachInsert(downVOList); } }
jdbc批量寫入
[@Service](https://my.oschina.net/service) public class PgService { @Autowired private JdbcTemplate jdbcTemplate; @Transactional(rollbackFor = Exception.class) public void jdbcBachInsert(List<KafkaDownVO> list){ jdbcTemplate.batchUpdate(PG_INSERT_SQL, new BatchPreparedStatementSetter() { @Override public void setValues(PreparedStatement ps, int i) throws SQLException { KafkaDownVO vo = list.get(i); ps.setLong(1,vo.getDeviceId()); ps.setLong(2,vo.getPropertyId()); ps.setString(3,vo.getData()); ps.setTimestamp(4,new Timestamp(vo.getCollTime(). getTime())); ps.setTimestamp(5,new Timestamp(System. currentTimeMillis())); } @Override public int getBatchSize() { return list.size(); } }); } }
OK,完成,啟動專案,看一下資料庫是否有寫入的資料
結束語
資料已經儲存到資料庫中了,下面就開始對裝置資料的監控了。