1. 程式人生 > >【Tensor'flow】第一個FCN網路

【Tensor'flow】第一個FCN網路

學習Tensorflow,寫一個超級簡單的全卷積,效果沒有,只是能跑通,沒有dropout。

#!/usr/bin/env python
#coding:utf-8
from __future__ import absolute_import
from __future__ import division

import os,cv2
import numpy as np
import time
import tensorflow as tf
def weight_variable(shape):
	# 使用截斷的正態分佈初始權重
	initial = tf.truncated_normal(shape, stddev = 0.01)
	return tf.Variable(initial)

def bias_variable(shape):
	return tf.Variable(tf.constant(0.0, shape = shape))

def conv_layer(x, W, b):
	# W的尺寸是[ksize, ksize, input, output]
	conv = tf.nn.conv2d(x, W, strides = [1, 1, 1, 1], padding = 'SAME')
	conv_b = tf.nn.bias_add(conv, b)
	conv_relu = tf.nn.relu(conv_b)
	return conv_relu
	
def max_pool_layer(x):
	return tf.nn.max_pool(x, ksize = [1, 2, 2, 1], strides = [1, 2, 2, 1], padding = 'SAME')
	
def deconv_layer(x, W, output_shape, b):
	# strides = 2 兩倍上卷積
	# output_shape = [batch_size, output_width, output_height, output_channel],注意第一個是batch_size
	# 權重W = [ksize, ksize, output, input]後兩位和卷積相反
	deconv = tf.nn.conv2d_transpose(x, W, output_shape,  strides = [1, 2, 2, 1], padding = 'SAME')
	return tf.nn.bias_add(deconv, b)
	
# 獲取資料
def get_data(image_path, label_path):
	image_list = os.listdir(image_path)
	label_list = os.listdir(label_path)
	image_list_arr = []
	label_list_arr = []
	for file in image_list:
		if file[-3:] == 'png':
			# cv2.imread('', -1)保持原始資料讀入;如果沒有-1會以圖片形式讀入,變成三通道
			image = cv2.imread(os.path.join(image_path,file),-1)
			#image = transform.resize(image, (512,512))
			image_list_arr.append(image)
					
	for file in label_list:
		if file[-3:] == 'png':
			label = cv2.imread(os.path.join(label_path,file), -1)
			label_list_arr.append(label)
	return (image_list_arr, label_list_arr)
	
# 讀取下一個batch資料
def next_batch(images, labels, batch_size, shuffle = False):
	assert len(images) == len(labels)
	if shuffle:
		indices = np.arange(len(images))
		np.random.shuffle(indices)
	for start_idx in range(0, len(images) - batch_size + 1, batch_size):
		if shuffle:
			exceprt = indices[start_idx : start_idx + batch_size]
		else:
			exceprt = slice(start_idx, start_idx + batch_size)
		yield np.array(images)[exceprt], np.array(labels)[exceprt]
	
	
def main():

	# 儘量寫相對路徑
	image_path = '
./data/mri' label_path = './data/labels' # 如果記憶體耗盡可以考慮將batch減小 batch_size = 4 n_epoch = 2 lr = 0.01 images, labels = get_data(image_path, label_path) ratio = 0.8 length = len(images) s = np.int(length * ratio) x_train = images[: s] y_train = labels[: s] x_val = images[s: ] y_val = labels[s:] keep_prob = tf.placeholder(tf.float32) # None代表樣本數量不固定 x = tf.placeholder(tf.float32, shape = [None, 256, 256, 3]) y = tf.placeholder(tf.float32, shape = [None, 256, 256, 3]) # input 256*256 # weight([ksize, ksize, input, output]) weight1 = weight_variable([3, 3, 3, 64]) bias1 = bias_variable([64]) conv1 = conv_layer(x, weight1, bias1) # input 256*256 # output 128*128 weight2 = weight_variable([3, 3, 64, 128]) bias2 = bias_variable([128]) conv2 = conv_layer(conv1, weight2, bias2) pool1 = max_pool_layer(conv2) # input 128*128 # output 64*64 weight3 = weight_variable([3, 3, 128, 256]) bias3 = bias_variable([256]) conv3 = conv_layer(pool1, weight3, bias3) pool2 = max_pool_layer(conv3) # deconv1 # weight([ksize, ksize, output, input]) # 64*64->128*128(pool1) deconv_weight1 = weight_variable([3, 3, 128, 256]) deconv_b1 = bias_variable([128]) deconv1 = deconv_layer(pool2, deconv_weight1, [batch_size, 128, 128, 128], deconv_b1) # 與pool1融合,使用add的話deconv和pool的output channel要一致 fuse_pool1 = tf.add(deconv1, pool1) # deconv2 # 128*128->256*256(input) deconv_weight2 = weight_variable([3, 3, 64, 128]) deconv_b2 = bias_variable([64]) deconv2 = deconv_layer(fuse_pool1, deconv_weight2, [batch_size, 256, 256, 64], deconv_b2) # 轉換成與輸入標籤相同的size,獲得最後結果 weight16 = weight_variable([3, 3, 64, 3]) bias16 = bias_variable([3]) conv16 = tf.nn.conv2d(deconv2, weight16, strides = [1, 1, 1, 1], padding = 'SAME') conv16_b = tf.nn.bias_add(conv16, bias16) logits16 = conv16_b # loss loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=logits16, labels=y)) opt = tf.train.AdamOptimizer(1e-4).minimize(loss) sess = tf.Session() sess.run(tf.global_variables_initializer()) for epoch in range(n_epoch): # train for x_train_batch, y_train_batch in next_batch(x_train, y_train, batch_size, shuffle = True): _, train_loss = sess.run([opt, loss], feed_dict = {x: x_train_batch, y: y_train_batch}) print ("------trian loss: %f" % train_loss) # val val_loss = 0 for x_val_batch, y_val_batch in next_batch(x_val, y_val, batch_size, shuffle = True): val_loss = sess.run([loss], feed_dict={x: x_val_batch, y: y_val_batch}) print("------val loss : %f" % val_loss) sess.close() if __name__ == '__main__': main()