1. 程式人生 > >分散式專案(五)iot-pgsql

分散式專案(五)iot-pgsql

書接上回,在Mapping server中,我們已經把資料都整理好了,現在利用postgresql儲存歷史資料。

iot-pgsql

構建iot-pgsql模組,這裡我們寫資料庫為了效能考慮不在使用mybatis,換成spring jdbc批處理寫資料庫。

引入spring jdbc依賴

	<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
            <version>2.1.4.RELEASE</version>
        </dependency>

因為資料來此訂閱的kafka資料,所以還需要引入kafka依賴,這裡已經kakfa隔離成了一個獨立的模組,所以加入kakfa模組就行了。

	<dependency>
            <groupId>cn.le</groupId>
            <artifactId>iot-kafka</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

kakfa監聽

[@Component](https://my.oschina.net/u/3907912)
public class PgListener {

    @Autowired
    private PgService pgService;
    private static List<KafkaDownVO> insertList = new LinkedList<KafkaDownVO>();

    @KafkaListener(topics = DOWN_TOPIC)
    public void pgListener(String msg){
        System.out.println("-----------" + msg);
        List<KafkaDownVO> downVOList = JSONArray.parseArray(msg,KafkaDownVO.class);

        //批量寫入資料庫
        insertList.addAll(downVOList);
        if (insertList.size() > 1000){
            jdbcBachInsert(insertList);
        }
    }

    public void jdbcBachInsert(List<KafkaDownVO> downVOList){
        if (CollectionUtils.isEmpty(downVOList)){
            return;
        }
        pgService.jdbcBachInsert(downVOList);
    }
}

jdbc批量寫入

[@Service](https://my.oschina.net/service)
public class PgService {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Transactional(rollbackFor = Exception.class)
    public void jdbcBachInsert(List<KafkaDownVO> list){
        jdbcTemplate.batchUpdate(PG_INSERT_SQL, 
		new BatchPreparedStatementSetter() {
            @Override
            public void setValues(PreparedStatement ps, int i) 
			throws SQLException {
                KafkaDownVO vo = list.get(i);
                ps.setLong(1,vo.getDeviceId());
                ps.setLong(2,vo.getPropertyId());
                ps.setString(3,vo.getData());
                ps.setTimestamp(4,new Timestamp(vo.getCollTime().
				getTime()));
                ps.setTimestamp(5,new Timestamp(System.
				currentTimeMillis()));
            }

            @Override
            public int getBatchSize() {
                return list.size();
            }
        });
    }
}

OK,完成,啟動專案,看一下資料庫是否有寫入的資料

結束語

資料已經儲存到資料庫中了,下面就開始對裝置資料的監控了。

https://gitee.com/d