Commit 88c4368e by robin-ai-ml

inital release

parents
MIT License
Copyright (c) 2020 robin-ai-ml
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
# Face.KeyPoints
Using Keras+Tensorflow to do Face Key Points Detection
import numpy as np
class DataModifier(object):
def fit(self,X_,y_):
return(NotImplementedError)
class FlipPic(DataModifier):
def __init__(self,flip_indices=None):
if flip_indices is None:
flip_indices = [
(0, 2), (1, 3),
(4, 8), (5, 9), (6, 10), (7, 11),
(12, 16), (13, 17), (14, 18), (15, 19),
(22, 24), (23, 25)
]
self.flip_indices = flip_indices
def fit(self,X_batch,y_batch):
batch_size = X_batch.shape[0]
indices = np.random.choice(batch_size, int(batch_size/2), replace=False)
X_batch[indices] = X_batch[indices, :, ::-1,:]
y_batch[indices, ::2] = y_batch[indices, ::2] * -1
# flip left eye to right eye, left mouth to right mouth and so on ..
for a, b in self.flip_indices:
y_batch[indices, a], y_batch[indices, b] = (
y_batch[indices, b], y_batch[indices, a]
)
return X_batch, y_batch
class ShiftFlipPic(FlipPic):
def __init__(self,flip_indices=None,prop=0.1):
super(ShiftFlipPic,self).__init__(flip_indices)
self.prop = prop
def fit(self,X,y):
X, y = super(ShiftFlipPic,self).fit(X,y)
X, y = self.shift_image(X,y,prop=self.prop)
return(X,y)
def random_shift(self,shift_range,n=96):
'''
:param shift_range:
The maximum number of columns/rows to shift
:return:
keep(0): minimum row/column index to keep
keep(1): maximum row/column index to keep
assign(0): minimum row/column index to assign
assign(1): maximum row/column index to assign
shift: amount to shift the landmark
assign(1) - assign(0) == keep(1) - keep(0)
'''
shift = np.random.randint(-shift_range,
shift_range)
def shift_left(n,shift):
shift = np.abs(shift)
return(0,n - shift)
def shift_right(n,shift):
shift = np.abs(shift)
return(shift,n)
if shift < 0:
keep = shift_left(n,shift)
assign = shift_right(n,shift)
else:
assign = shift_left(n,shift) ## less than 96
keep = shift_right(n,shift)
return((keep, assign, shift))
def shift_single_image(self,x_,y_,prop=0.1):
'''
:param x_: a single picture array (96, 96, 1)
:param y_: 15 landmark locations
[0::2] contains x axis values
[1::2] contains y axis values
:param prop: proportion of random horizontal and vertical shift
relative to the number of columns
e.g. prop = 0.1 then the picture is moved at least by
0.1*96 = 8 columns/rows
:return:
x_, y_
'''
w_shift_max = int(x_.shape[0] * prop)
h_shift_max = int(x_.shape[1] * prop)
w_keep,w_assign,w_shift = self.random_shift(w_shift_max)
h_keep,h_assign,h_shift = self.random_shift(h_shift_max)
x_[w_assign[0]:w_assign[1],
h_assign[0]:h_assign[1],:] = x_[w_keep[0]:w_keep[1],
h_keep[0]:h_keep[1],:]
y_[0::2] = y_[0::2] - h_shift/float(x_.shape[0]/2.)
y_[1::2] = y_[1::2] - w_shift/float(x_.shape[1]/2.)
return(x_,y_)
def shift_image(self,X,y,prop=0.1):
## This function may be modified to be more efficient e.g. get rid of loop?
for irow in range(X.shape[0]):
x_ = X[irow]
y_ = y[irow]
X[irow],y[irow] = self.shift_single_image(x_,y_,prop=prop)
return(X,y)
\ No newline at end of file
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
from __future__ import division
from keras.backend.tensorflow_backend import set_session
import tensorflow as tf
import numpy as np
import time
import os
import cv2
import kmodel
from utils import transparentOverlay
os.environ['KERAS_BACKEND'] = 'tensorflow'
print(tf.__version__)
config = tf.ConfigProto(log_device_placement=True, allow_soft_placement=True,
gpu_options=tf.GPUOptions(per_process_gpu_memory_fraction=0.7))
# allow_growth=True per_process_gpu_memory_fraction = 0.3
#per_process_gpu_memory_fraction = 0.3
sess = tf.Session(config=config)
set_session(sess)
# os.environ['KMP_DUPLICATE_LIB_OK']='True'
# 加载预先训练好的模型
#my_model = kmodel.load_trained_model('yuan_model_mac')
# 加载自己训练好的模型(测试时取消下面行的注释)
my_model = kmodel.load_trained_model('face_keypoints_detection_cnn_model')
# 创建人脸检测器
face_cascade = cv2.CascadeClassifier(
'cascades/haarcascade_frontalface_default.xml')
#smileCascade = cv2.CascadeClassifier('cascades/haarcascade_smile.xml')
# 加载摄像头
camera = cv2.VideoCapture(0)
# 加载一个太阳眼镜图像
sunglasses = cv2.imread('sunglass.png', cv2.IMREAD_UNCHANGED)
# 死循环
while True:
# time.sleep(0.01)
# 从摄像头获取一张图像
(_, frame) = camera.read()
frame = cv2.flip(frame, 1)
frame2 = np.copy(frame)
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# 检测所有的人脸
faces = face_cascade.detectMultiScale(gray, 1.25, 6)
# 对每一个检测到的人脸
for (x, y, w, h) in faces:
# 只包含人脸的图像
gray_face = gray[y:y+h, x:x+w]
color_face = frame[y:y+h, x:x+w]
# 将人脸图像的值 normalize 在 [0, 1] 之间
gray_normalized = gray_face / 255
# 缩放灰度图人脸到 96x96 匹配网络的输入
original_shape = gray_face.shape # A Copy for future reference
face_resized = cv2.resize(
gray_normalized, (96, 96), interpolation=cv2.INTER_AREA)
face_resized = face_resized.reshape(1, 96, 96, 1)
# 预测关键点坐标
keypoints = my_model.predict(face_resized)
# 将关键点坐标的值从 [-1, 1] 之间转换为 [0, 96] 之间
keypoints = keypoints * 48 + 48
# 缩放彩色图人脸到 96x96 匹配关键点
face_resized_color = cv2.resize(
color_face, (96, 96), interpolation=cv2.INTER_AREA)
face_resized_color2 = np.copy(face_resized_color)
# 将网络输出的30个值配对为15个tuple对
points = []
for i, co in enumerate(keypoints[0][0::2]):
points.append((co, keypoints[0][1::2][i]))
# 按照关键点的 left_eyebrow_outer_end_x[7], right_eyebrow_outer_end_x[9]确定眼镜的宽度
sunglass_width = int((points[7][0]-points[9][0])*1.1)
# 按照关键点的 nose_tip_y[10], right_eyebrow_inner_end_y[8]确定眼镜的高度
sunglass_height = int((points[10][1]-points[8][1])/1.1)
sunglass_resized = cv2.resize(
sunglasses, (sunglass_width, sunglass_height), interpolation=cv2.INTER_CUBIC)
face_resized_color = transparentOverlay(face_resized_color, sunglass_resized, pos=(
int(points[9][0]), int(points[9][1])), scale=1)
# 将覆盖了眼镜的 face_resized_color 图像转为摄像头捕捉到的原始图像中的大小
frame[y:y+h, x:x+w] = cv2.resize(face_resized_color,
original_shape, interpolation=cv2.INTER_CUBIC)
# 在人脸图像中显示关键点坐标
for keypoint in points:
cv2.circle(face_resized_color2, keypoint, 1, (0, 255, 0), 1)
frame2[y:y+h, x:x+w] = cv2.resize(face_resized_color2,
original_shape, interpolation=cv2.INTER_CUBIC)
# 显示加了眼镜的图像
cv2.imshow("With Glass", frame)
# 显示添加了关键点的图像
cv2.imshow("With Keypoints", frame2)
# 当 'q' 键被点击, 退出循环
if cv2.waitKey(1) & 0xFF == ord("q"):
break
# 释放摄像头, 关闭窗口
camera.release()
cv2.destroyAllWindows()
from __future__ import division
from keras.models import Sequential
from keras.models import load_model
from keras.layers import Convolution2D, MaxPooling2D, Dropout
from keras.layers import Flatten, Dense
from keras.optimizers import SGD, RMSprop, Adagrad, Adadelta, Adam, Adamax, Nadam
from keras.preprocessing.image import ImageDataGenerator
import numpy as np
def create_model(useDropout=False):
'''
网络的输入为96x96的单通道灰阶图像, 输出30个值, 代表的15个关键点的横坐标和纵坐标
'''
model = Sequential()
model.add(Convolution2D(32, (5, 5), input_shape=(96,96,1), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
if useDropout:
model.add(Dropout(0.1))
model.add(Convolution2D(64,(2,2), activation='relu'))
model.add(MaxPooling2D(pool_size = (2,2)))
if useDropout:
model.add(Dropout(0.1))
model.add(Convolution2D(128,(2,2), activation='relu'))
model.add(MaxPooling2D(pool_size=(2,2)))
if useDropout:
model.add(Dropout(0.1))
model.add(Flatten())
model.add(Dense(500, activation='relu'))
if useDropout:
model.add(Dropout(0.1))
model.add(Dense(500, activation='relu'))
if useDropout:
model.add(Dropout(0.1))
model.add(Dense(30))
return model
def compile_model(model):
sgd = SGD(lr=0.01,momentum = 0.9,nesterov=True)
optimizer = sgd
loss = "mean_squared_error"
metrics = ['accuracy']
model.compile(optimizer=optimizer, loss=loss, metrics=metrics)
return model
'''
def train_model(model, X_train, y_train):
return model.fit(X_train, y_train, epochs=100, batch_size=200, verbose=1, validation_split=0.2)
'''
def train_model(model, modifier, train, validation,
batch_size=32,epochs=2000,print_every=10,patience=np.Inf):
'''
model : keras model object
Modifier: DataModifier() object
train: tuple containing two numpy arrays (X_train,y_train)
validation: tuple containing two numpy arrays (X_val,y_val)
patience: The back propagation algorithm will stop if the val_loss does not decrease
after epochs
'''
## manually write fit method
X_train,y_train = train
X_val, y_val = validation
generator = ImageDataGenerator()
history = {"loss":[],"val_loss":[]}
for e in range(epochs):
if e % print_every == 0:
print('Epoch {:4}:'.format(e)),
## -------- ##
## training
## -------- ##
batches = 0
loss_epoch = []
for X_batch, y_batch in generator.flow(X_train, y_train, batch_size=batch_size):
X_batch, y_batch = modifier.fit(X_batch, y_batch)
hist = model.fit(X_batch, y_batch,verbose=False,epochs=1)
loss_epoch.extend(hist.history["loss"])
batches += 1
if batches >= len(X_train) / batch_size:
# we need to break the loop by hand because
# the generator loops indefinitely
break
loss = np.mean(loss_epoch)
history["loss"].append(loss)
## --------- ##
## validation
## --------- ##
y_pred = model.predict(X_val)
val_loss = np.mean((y_pred - y_val)**2)
history["val_loss"].append(val_loss)
if e % print_every == 0:
print("loss - {:6.5f}, val_loss - {:6.5f}".format(loss,val_loss))
min_val_loss = np.min(history["val_loss"])
## Early stopping
if patience is not np.Inf:
if np.all(min_val_loss < np.array(history["val_loss"])[-patience:]):
break
return(history)
def save_model(model, fileName):
model.save(fileName + '.h5')
def load_trained_model(fileName):
return load_model(fileName + '.h5')
from utils import load_data
from utils import plot_data
from utils import plot_loss
from utils import plot_predicted_images
import kmodel, os
#from data augumention
from ShiftFlipPic import FlipPic
from sklearn.model_selection import train_test_split
os.environ['KERAS_BACKEND'] = 'tensorflow'
import tensorflow as tf
from keras.backend.tensorflow_backend import set_session
print(tf.__version__)
config = tf.ConfigProto(log_device_placement=True, allow_soft_placement=True,
gpu_options=tf.GPUOptions(per_process_gpu_memory_fraction=0.7))
#allow_growth=True per_process_gpu_memory_fraction = 0.3
#per_process_gpu_memory_fraction = 0.3
sess = tf.Session(config=config)
set_session(sess)
# 加载训练数据
X_train, y_train = load_data()
print("X.shape == {}; X.min == {:.3f}; X.max == {:.3f}".format(
X_train.shape, X_train.min(), X_train.max()))
print("y.shape == {}; y.min == {:.3f}; y.max == {:.3f}".format(
y_train.shape, y_train.min(), y_train.max()))
# display one picture
plot_data(X_train[0], y_train[0])
# 创建网络结构
my_model = kmodel.create_model()
# 编译网络模型
my_model = kmodel.compile_model(my_model)
# 训练网络模型
#hist = kmodel.train_model(my_model, X_train, y_train)
modifier = FlipPic() #data argumentation
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2, random_state=42)
hist = kmodel.train_model(my_model, modifier,
train=(X_train,y_train),
validation=(X_val,y_val),
batch_size=32,epochs=2000,print_every=100)
plot_loss(hist,"Face key points CNN model")
# give a simple predict test
X_test, _ = load_data(test=True)
y_test = my_model.predict(X_test)
#display the predict result
plot_predicted_images(X_test, y_test, 18)
kmodel.save_model(my_model, 'face_keypoints_detection_cnn_model')
import numpy as np
from pandas.io.parsers import read_csv
from sklearn.utils import shuffle
import cv2
import matplotlib.pyplot as plt
from keras.models import load_model
def load_data(test=False):
"""
当 test 为真, 加载测试数据, 否则加载训练数据
"""
FTRAIN = './data/training.csv'
FTEST = './data/test.csv'
fname = FTEST if test else FTRAIN
df = read_csv(fname)
# 将'Image' 列中 '空白键' 分割的数字们转换为一个 numpy array
df['Image'] = df['Image'].apply(lambda im: np.fromstring(im, sep=' '))
print("total count:{} ".format(df.count()))
# 丢弃有缺失值的数据
df = df.dropna()
print("after dropna count:{} ".format(df.count()))
# 将图像的数字从 0 到 255 的整数转换为 0 到 1 的实数
X = np.vstack(df['Image'].values) / 255.
X = X.astype(np.float32)
# 将 X 的每一行转换为一个 96 * 96 * 1 的三维数组
X = X.reshape(-1, 96, 96, 1)
# 只有 FTRAIN 包含关键点的数据 (target value)
if not test:
y = df[df.columns[:-1]].values
# 将关键点的值 normalize 到 [-1, 1] 之间
y = (y - 48) / 48
# 置乱训练数据
X, y = shuffle(X, y, random_state=42)
y = y.astype(np.float32)
else:
y = None
return X, y
def transparentOverlay(src, overlay, pos=(0, 0), scale=1):
"""
将带透明通道(png图像)的图像 overlay 叠放在src图像上方
:param src: 背景图像
:param overlay: 带透明通道的图像 (BGRA)
:param pos: 叠放的起始位置
:param scale : overlay图像的缩放因子
:return: Resultant Image
"""
if scale != 1:
overlay = cv2.resize(overlay, (0, 0), fx=scale, fy=scale)
# overlay图像的高和宽
h, w, _ = overlay.shape
# 叠放的起始坐标
y, x = pos[0], pos[1]
# 以下被注释的代码是没有优化的版本, 便于理解, 与如下没有注释的版本的功能一样
"""
# src图像的高和款
rows,cols,_ = src.shape
for i in range(h):
for j in range(w):
if x+i >= rows or y+j >= cols:
continue
alpha = float(overlay[i][j][3]/255.0) # 读取alpha通道的值
src[x+i][y+j] = alpha*overlay[i][j][:3]+(1-alpha)*src[x+i][y+j]
return src """
alpha = overlay[:, :, 3]/255.0
alpha = alpha[..., np.newaxis]
src[x:x+h, y:y+w, :] = alpha * overlay[:, :, :3] + \
(1-alpha)*src[x:x+h, y:y+w, :]
return src
def plot_data(img, landmarks, axis=plt):
"""
Plot image (img), along with normalized facial keypoints (landmarks)
"""
axis.imshow(np.squeeze(img), cmap='gray') # plot the image
landmarks = landmarks * 48 + 48 # undo the normalization
# Plot the keypoints
axis.scatter(landmarks[0::2],
landmarks[1::2],
marker='o',
c='c',
s=40)
axis.show()
def plot_keypoints(img_path,
face_cascade=cv2.CascadeClassifier(
'haarcascade_frontalface_alt.xml'),
model_path='my_model.h5'):
# TODO: write a function that plots keypoints on arbitrary image containing human
img = cv2.imread(img_path)
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
faces = face_cascade.detectMultiScale(gray)
fig = plt.figure(figsize=(5, 5))
ax = fig.add_subplot(1, 1, 1, xticks=[], yticks=[])
ax.imshow(cv2.cvtColor(cv2.imread(img_path), cv2.COLOR_BGR2RGB))
if len(faces) == 0:
plt.title('no faces detected')
elif len(faces) > 1:
plt.title('many faces detected')
for (x, y, w, h) in faces:
rectangle = cv2.rectangle(
img, (x, y), (x+w, y+h), (255, 255, 0), 2)
ax.imshow(cv2.cvtColor(rectangle, cv2.COLOR_BGR2RGB))
elif len(faces) == 1:
plt.title('one face detected')
x, y, w, h = faces[0]
bgr_crop = img[y:y+h, x:x+w]
orig_shape_crop = bgr_crop.shape
gray_crop = cv2.cvtColor(bgr_crop, cv2.COLOR_BGR2GRAY)
resize_gray_crop = cv2.resize(gray_crop, (96, 96)) / 255.
model = load_model(model_path)
landmarks = np.squeeze(model.predict(
np.expand_dims(np.expand_dims(resize_gray_crop, axis=-1), axis=0)))
ax.scatter(((landmarks[0::2] * 48 + 48)*orig_shape_crop[0]/96)+x,
((landmarks[1::2] * 48 + 48)*orig_shape_crop[1]/96)+y,
marker='o', c='c', s=40)
plt.show()
def plot_loss(hist, name, plt=plt, RMSE_TF=False):
'''
RMSE_TF: if True, then RMSE is plotted with original scale
'''
loss = hist['loss']
val_loss = hist['val_loss']
if RMSE_TF:
loss = np.sqrt(np.array(loss))*48
val_loss = np.sqrt(np.array(val_loss))*48
plt.figure(figsize=(8, 8))
plt.plot(loss, "--", linewidth=3, label="train:"+name)
plt.plot(val_loss, linewidth=3, label="val:"+name)
plt.legend()
plt.grid()
plt.yscale("log")
plt.xlabel("epoch")
plt.ylabel("loss")
plt.show()
def plot_sample(X,y,axs):
'''
kaggle picture is 96 by 96
y is rescaled to range between -1 and 1
'''
axs.imshow(X.reshape(96,96),cmap="gray")
axs.scatter(48*y[0::2]+ 48,48*y[1::2]+ 48)
def plot_predicted_images(X_test, y_test, num_of_images, plt=plt):
fig = plt.figure(figsize=(7, 7))
fig.subplots_adjust(hspace=0.13, wspace=0.0001,
left=0, right=1, bottom=0, top=1)
count = 1
for irow in range(num_of_images):
ipic = np.random.choice(X_test.shape[0])
ax = fig.add_subplot(num_of_images/3, 3, count, xticks=[], yticks=[])
plot_sample(X_test[ipic], y_test[ipic], ax)
ax.set_title("images " + str(ipic))
count += 1
plt.show()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment