深度有趣 | 27 服饰关键点定位

news/2024/7/3 6:01:22

简介

介绍如何使用CPM(Convolutional Pose Machines)实现服饰关键点定位

原理

关键点定位是一类常见而有用的任务,某种意义上可以理解为一种特征工程

  • 人脸关键点定位,可用于人脸识别、表情识别
  • 人体骨骼关键点定位,可用于姿态估计
  • 手部关键点定位,可用于手势识别

输入是一张图片,输出是每个关键点的x、y坐标,一般会归一化到0~1区间中,所以可以理解为回归问题

但是直接对坐标值进行回归会导致较大误差,更好的做法是输出一个低分辨率的热图,使得关键点所在位置输出较高响应,而其他位置则输出较低响应

CPM(Convolutional Pose Machines)的基本思想是使用多个级联的stage,每个stage包含多个CNN并且都输出热图

通过最小化每个stage的热图和ground truth之间的差距,从而得到越来越准确的关键点定位结果

数据

使用天池FashionAI全球挑战赛提供的数据,fashionai.alibaba.com/

其中服饰关键点定位赛题提供的训练集包括4W多张图片,测试集包括将近1W张图片

每张图片都指定了对应的服饰类别,共5类:上衣(blouse)、外套(outwear)、连身裙(dress)、半身裙(skirt)、裤子(trousers)

训练集还提供了每张图片对应的24个关键点的标注,包括x坐标、y坐标、是否可见三项信息,但并不是每类服饰都有24个关键点

关于以上数据的更多介绍可以参考以下文章,zhuanlan.zhihu.com/p/34928763

为了简化问题,以下仅使用dress类别的训练集数据训练CPM模型

实现

加载库

# -*- coding: utf-8 -*-import tensorflow as tf
import numpy as np
import pandas as pd
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
import cv2
import matplotlib.pyplot as plt
%matplotlib inline
from imageio import imread, imsave
import os
import glob
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')
复制代码

加载训练集和测试集

train = pd.read_csv(os.path.join('data', 'train', 'train.csv'))
train_warm = pd.read_csv(os.path.join('data', 'train_warm', 'train_warm.csv'))
test = pd.read_csv(os.path.join('data', 'test', 'test.csv'))
print(len(train), len(train_warm), len(test))columns = train.columns
print(len(columns), columns)train['image_id'] = train['image_id'].apply(lambda x:os.path.join('train', x))
train_warm['image_id'] = train_warm['image_id'].apply(lambda x:os.path.join('train_warm', x))
train = pd.concat([train, train_warm])
del train_warm
train.head()
复制代码

仅保留dress类别的数据

train = train[train.image_category == 'dress']
test = test[test.image_category == 'dress']
print(len(train), len(test))
复制代码

拆分标注信息中的x坐标、y坐标、是否可见

for col in columns:if col in ['image_id', 'image_category']:continuetrain[col + '_x'] = train[col].apply(lambda x:float(x.split('_')[0]))train[col + '_y'] = train[col].apply(lambda x:float(x.split('_')[1]))train[col + '_s'] = train[col].apply(lambda x:float(x.split('_')[2]))train.drop([col], axis=1, inplace=True)
train.head()
复制代码

将x坐标和y坐标进行归一化

features = ['neckline_left', 'neckline_right', 'center_front', 'shoulder_left', 'shoulder_right', 'armpit_left', 'armpit_right', 'waistline_left', 'waistline_right', 'cuff_left_in', 'cuff_left_out', 'cuff_right_in', 'cuff_right_out', 'hemline_left', 'hemline_right']train = train.to_dict('records')
for i in tqdm(range(len(train))):record = train[i]img = imread(os.path.join('data', record['image_id']))h = img.shape[0]w = img.shape[1]for col in features:if record[col + '_s'] >= 0:train[i][col + '_x'] /= wtrain[i][col + '_y'] /= helse:train[i][col + '_x'] = 0train[i][col + '_y'] = 0
复制代码

随机选一些训练数据并绘图查看

img_size = 256
r = 10
c = 10
puzzle = np.ones((img_size * r, img_size * c, 3))
random_indexs = np.random.choice(len(train), 100)
for i in range(100):record = train[random_indexs[i]]img = imread(os.path.join('data', record['image_id']))img = cv2.resize(img, (img_size, img_size))for col in features:if record[col + '_s'] >= 0:cv2.circle(img, (int(img_size * record[col + '_x']), int(img_size * record[col + '_y'])), 3, (120, 240, 120), 2)img = img / 255.r = i // 10c = i % 10puzzle[r * img_size: (r + 1) * img_size, c * img_size: (c + 1) * img_size, :] = img
plt.figure(figsize=(15, 15))
plt.imshow(puzzle)
plt.show()
复制代码

整理数据并分割训练集和验证集

X_train = []
Y_train = []
for i in tqdm(range(len(train))):record = train[i]img = imread(os.path.join('data', record['image_id']))img = cv2.resize(img, (img_size, img_size))y = []for col in features:y.append([record[col + '_x'], record[col + '_y']])X_train.append(img)Y_train.append(y)X_train = np.array(X_train)
Y_train = np.array(Y_train)
print(X_train.shape)
print(Y_train.shape)X_train, X_valid, Y_train, Y_valid = train_test_split(X_train, Y_train, test_size=0.1)
print(X_train.shape, Y_train.shape)
print(X_valid.shape, Y_valid.shape)
复制代码

定义一些参数和模型输入

batch_size = 16
heatmap_size = 32
stages = 6
y_dim = Y_train.shape[1]X = tf.placeholder(tf.float32, [None, img_size, img_size, 3], name='X')
Y = tf.placeholder(tf.float32, [None, heatmap_size, heatmap_size, y_dim + 1], name='Y')def conv2d(inputs, filters, kernel_size, padding='same', activation=tf.nn.relu, name=''):if name:return tf.layers.conv2d(inputs, filters=filters, kernel_size=kernel_size, strides=1, padding=padding, activation=activation, name=name, kernel_initializer=tf.contrib.layers.xavier_initializer())else:return tf.layers.conv2d(inputs, filters=filters, kernel_size=kernel_size, strides=1, padding=padding, activation=activation, kernel_initializer=tf.contrib.layers.xavier_initializer())def maxpool2d(inputs):return tf.layers.max_pooling2d(inputs, pool_size=2, strides=2, padding='valid')
复制代码

定义CPM模型,使用6个stage

stage_heatmaps = []# sub_stage
h0 = maxpool2d(conv2d(conv2d(X, 64, 3), 64, 3))
h0 = maxpool2d(conv2d(conv2d(h0, 128, 3), 128, 3))
h0 = maxpool2d(conv2d(conv2d(conv2d(conv2d(h0, 256, 3), 256, 3), 256, 3), 256, 3))
for i in range(6):h0 = conv2d(h0, 512, 3)
sub_stage = conv2d(h0, 128, 3) # batch_size, 32, 32, 128# stage_1
h0 = conv2d(sub_stage, 512, 1, padding='valid')
h0 = conv2d(h0, y_dim + 1, 1, padding='valid', activation=None, name='stage_1')
stage_heatmaps.append(h0)# other stages
for stage in range(2, stages + 1):h0 = tf.concat([stage_heatmaps[-1], sub_stage], axis=3)for i in range(5):h0 = conv2d(h0, 128, 7)h0 = conv2d(h0, 128, 1, padding='valid')h0 = conv2d(h0, y_dim + 1, 1, padding='valid', activation=None, name='stage_%d' % stage)stage_heatmaps.append(h0)
复制代码

定义损失函数和优化器

global_step = tf.Variable(0, trainable=False)
learning_rate = tf.train.exponential_decay(0.001, global_step=global_step, decay_steps=1000, decay_rate=0.9)losses = [0 for _ in range(stages)]
total_loss = 0
for stage in range(stages):losses[stage] = tf.losses.mean_squared_error(Y, stage_heatmaps[stage])total_loss += losses[stage]total_loss_with_reg = total_loss + tf.contrib.layers.apply_regularization(tf.contrib.layers.l2_regularizer(1e-10), tf.trainable_variables())
total_loss = total_loss / stages
total_loss_with_reg = total_loss_with_reg / stagesoptimizer = tf.contrib.layers.optimize_loss(total_loss_with_reg, global_step=global_step, learning_rate=learning_rate, optimizer='Adam', increment_global_step=True)
复制代码

由于训练数据较少,因此定义一个数据增强的函数

def transform(X_batch, Y_batch):X_data = []Y_data = []offset = 20for i in range(X_batch.shape[0]):img = X_batch[i]# random rotationdegree = int(np.random.random() * offset - offset / 2)rad = degree / 180 * np.pimat = cv2.getRotationMatrix2D((img_size / 2, img_size / 2), degree, 1)img_ = cv2.warpAffine(img, mat, (img_size, img_size), borderValue=(255, 255, 255))# random translationx0 = int(np.random.random() * offset - offset / 2)y0 = int(np.random.random() * offset - offset / 2)mat = np.float32([[1, 0, x0], [0, 1, y0]])img_ = cv2.warpAffine(img_, mat, (img_size, img_size), borderValue=(255, 255, 255))# random flipif np.random.random() > 0.5:img_ = np.fliplr(img_)flip = Trueelse:flip = FalseX_data.append(img_)points = []for j in range(y_dim):x = Y_batch[i, j, 0] * img_sizey = Y_batch[i, j, 1] * img_size# random rotationdx = x - img_size / 2  dy = y - img_size / 2x = int(dx * np.cos(rad) + dy * np.sin(rad) + img_size / 2)  y = int(-dx * np.sin(rad) + dy * np.cos(rad) + img_size / 2)# random translationx += x0y += y0x = x / img_sizey = y / img_sizepoints.append([x, y])# random flipif flip:data = {features[j]: points[j] for j in range(y_dim)}points = []for j in range(y_dim):col = features[j]if col.find('left') >= 0:col = col.replace('left', 'right')elif col.find('right') >= 0:col = col.replace('right', 'left')[x, y] = data[col]x = 1 - xpoints.append([x, y])Y_data.append(points)X_data = np.array(X_data)Y_data = np.array(Y_data)# preprocessX_data = (X_data / 255. - 0.5) * 2Y_heatmap = []for i in range(Y_data.shape[0]):heatmaps = []invert_heatmap = np.ones((heatmap_size, heatmap_size))for j in range(Y_data.shape[1]):x0 = int(Y_data[i, j, 0] * heatmap_size)y0 = int(Y_data[i, j, 1] * heatmap_size)x = np.arange(0, heatmap_size, 1, float)y = x[:, np.newaxis]cur_heatmap = np.exp(-((x - x0) ** 2 + (y - y0) ** 2) / (2.0 * 1.0 ** 2))heatmaps.append(cur_heatmap)invert_heatmap -= cur_heatmapheatmaps.append(invert_heatmap)Y_heatmap.append(heatmaps)Y_heatmap = np.array(Y_heatmap)Y_heatmap = np.transpose(Y_heatmap, (0, 2, 3, 1)) # batch_size, heatmap_size, heatmap_size, y_dim + 1return X_data, Y_data, Y_heatmap
复制代码

查看数据增强之后的图片和关键点是否正确

X_batch = X_train[:batch_size]
Y_batch = Y_train[:batch_size]
X_data, Y_data, Y_heatmap = transform(X_batch, Y_batch)n = int(np.sqrt(batch_size))
puzzle = np.ones((img_size * n, img_size * n, 3))
for i in range(batch_size):img = (X_data[i] + 1) / 2for j in range(y_dim):cv2.circle(img, (int(img_size * Y_data[i, j, 0]), int(img_size * Y_data[i, j, 1])), 3, (120, 240, 120), 2)r = i // nc = i % npuzzle[r * img_size: (r + 1) * img_size, c * img_size: (c + 1) * img_size, :] = img
plt.figure(figsize=(12, 12))
plt.imshow(puzzle)
plt.show()
复制代码

可以看到,在经过随机旋转、随机平移、随机水平翻转之后,图片和关键点依旧一一对应

训练模型,使用early stopping

sess = tf.Session()
sess.run(tf.global_variables_initializer())OUTPUT_DIR = 'samples'
if not os.path.exists(OUTPUT_DIR):os.mkdir(OUTPUT_DIR)for stage in range(stages):tf.summary.scalar('loss/loss_stage_%d' % (stage + 1), losses[stage])
tf.summary.scalar('loss/total_loss', total_loss)
summary = tf.summary.merge_all()
writer = tf.summary.FileWriter(OUTPUT_DIR)loss_valid_min = np.inf
saver = tf.train.Saver()epochs = 100
patience = 10
for e in range(epochs):loss_train = []loss_valid = []X_train, Y_train = shuffle(X_train, Y_train)for i in tqdm(range(X_train.shape[0] // batch_size)):X_batch = X_train[i * batch_size: (i + 1) * batch_size, :, :, :]Y_batch = Y_train[i * batch_size: (i + 1) * batch_size, :, :]X_data, Y_data, Y_heatmap = transform(X_batch, Y_batch)_, ls, lr, stage_heatmaps_ = sess.run([optimizer, total_loss, learning_rate, stage_heatmaps], feed_dict={X: X_data, Y: Y_heatmap})loss_train.append(ls)if i > 0 and i % 100 == 0:writer.add_summary(sess.run(summary, feed_dict={X: X_data, Y: Y_heatmap}), e * X_train.shape[0] // batch_size + i)writer.flush()loss_train = np.mean(loss_train)demo_img = (X_data[0] + 1) / 2demo_heatmaps = []for stage in range(stages):demo_heatmap = stage_heatmaps_[stage][0, :, :, :y_dim].reshape((heatmap_size, heatmap_size, y_dim))demo_heatmap = cv2.resize(demo_heatmap, (img_size, img_size))demo_heatmap = np.amax(demo_heatmap, axis=2)demo_heatmap = np.reshape(demo_heatmap, (img_size, img_size, 1))demo_heatmap = np.repeat(demo_heatmap, 3, axis=2)demo_heatmaps.append(demo_heatmap)demo_gt_heatmap = Y_heatmap[0, :, :, :y_dim].reshape((heatmap_size, heatmap_size, y_dim))demo_gt_heatmap = cv2.resize(demo_gt_heatmap, (img_size, img_size))demo_gt_heatmap = np.amax(demo_gt_heatmap, axis=2)demo_gt_heatmap = np.reshape(demo_gt_heatmap, (img_size, img_size, 1))demo_gt_heatmap = np.repeat(demo_gt_heatmap, 3, axis=2)upper_img = np.concatenate((demo_heatmaps[0], demo_heatmaps[1], demo_heatmaps[2]), axis=1)blend_img = 0.5 * demo_img + 0.5 * demo_gt_heatmaplower_img = np.concatenate((demo_heatmaps[-1], demo_gt_heatmap, blend_img), axis=1)demo_img = np.concatenate((upper_img, lower_img), axis=0)imsave(os.path.join(OUTPUT_DIR, 'sample_%d.jpg' % e), demo_img)X_valid, Y_valid = shuffle(X_valid, Y_valid)for i in range(X_valid.shape[0] // batch_size):X_batch = X_valid[i * batch_size: (i + 1) * batch_size, :, :, :]Y_batch = Y_valid[i * batch_size: (i + 1) * batch_size, :, :]X_data, Y_data, Y_heatmap = transform(X_batch, Y_batch)ls = sess.run(total_loss, feed_dict={X: X_data, Y: Y_heatmap})loss_valid.append(ls)loss_valid = np.mean(loss_valid)print('Epoch %d, lr %.6f, train loss %.6f, valid loss %.6f' % (e, lr, loss_train, loss_valid))if loss_valid < loss_valid_min:print('Saving model...')saver.save(sess, os.path.join(OUTPUT_DIR, 'cpm'))loss_valid_min = loss_validpatience = 10else:patience -= 1if patience == 0:break
复制代码

经过87个epoch之后训练提前停止,训练集损失为0.001228,验证集损失为0.001871,验证集最低损失为0.001821

训练集结果如下,上面三张图依次为stage1、stage2、stage3的结果,下面三张图依次为stage6、ground truth、groud truth和原图

在本机上使用以下代码,对测试集中的图片进行关键点定位

# -*- coding: utf-8 -*-import tensorflow as tf
import numpy as np
import pandas as pd
from sklearn.utils import shuffle
import cv2
import matplotlib.pyplot as plt
from imageio import imread, imsave
import ostest = pd.read_csv(os.path.join('data', 'test', 'test.csv'))
test['image_id'] = test['image_id'].apply(lambda x:os.path.join('test', x))
test = test[test.image_category == 'dress']
test = test['image_id'].valuesbatch_size = 16
img_size = 256
test = shuffle(test)
test = test[:batch_size]
X_test = []
for i in range(batch_size):img = imread(os.path.join('data', test[i]))img = cv2.resize(img, (img_size, img_size))X_test.append(img)
X_test = np.array(X_test)
print(X_test.shape)sess = tf.Session()
sess.run(tf.global_variables_initializer())OUTPUT_DIR = 'samples'
saver = tf.train.import_meta_graph(os.path.join(OUTPUT_DIR, 'cpm.meta'))
saver.restore(sess, tf.train.latest_checkpoint(OUTPUT_DIR))stages = 6
y_dim = 15
heatmap_size = 32
graph = tf.get_default_graph()
X = graph.get_tensor_by_name('X:0')
stage_heatmap = graph.get_tensor_by_name('stage_%d/BiasAdd:0' % stages)def visualize_result(imgs, heatmap, joints):imgs = imgs.astype(np.int32)coords = []for i in range(imgs.shape[0]):hp = heatmap[i, :, :, :joints].reshape((heatmap_size, heatmap_size, joints))hp = cv2.resize(hp, (img_size, img_size))coord = np.zeros((joints, 2))for j in range(joints):xy = np.unravel_index(np.argmax(hp[:, :, j]), (img_size, img_size))coord[j, :] = [xy[0], xy[1]]cv2.circle(imgs[i], (xy[1], xy[0]), 3, (120, 240, 120), 2)coords.append(coord)return imgs / 255., coordsheatmap = sess.run(stage_heatmap, feed_dict={X: (X_test / 255. - 0.5) * 2})
X_test, coords = visualize_result(X_test, heatmap, y_dim) n = int(np.sqrt(batch_size))
puzzle = np.ones((img_size * n, img_size * n, 3))
for i in range(batch_size):img = X_test[i]r = i // nc = i % npuzzle[r * img_size: (r + 1) * img_size, c * img_size: (c + 1) * img_size, :] = img
plt.figure(figsize=(12, 12))
plt.imshow(puzzle)
plt.show()
imsave('服饰关键点定位测试集结果.jpg', puzzle)
复制代码

测试集结果如下,对于大多数情况能够得到很准确的关键点定位结果

参考

  • Convolutional Pose Machines:arxiv.org/abs/1602.00…
  • Code repository for Convolutional Pose Machines:github.com/shihenw/con…
  • 天池FashionAI全球挑战赛小小尝试:zhuanlan.zhihu.com/p/34928763

视频讲解课程

深度有趣(一)


http://lihuaxi.xjx100.cn/news/237012.html

相关文章

对标以太坊的EOS再火,也拼不过InterValue的区块链4.0

链客&#xff0c;专为开发者而生&#xff0c;有问必答&#xff01; 此文章来自区块链技术社区&#xff0c;未经允许拒绝转载。 2017年6月26日上线的数字货币EOS&#xff0c;仅用5天时间&#xff0c;就融了1.85亿美元&#xff0c;一举打破ICO的融资记录。 至2018年4月12日&a…

Python中处理时间 —— time模块

time模块 逝去的秒数 逝去的秒数表示从某个时间&#xff08;Python中是“Thu Jan 1 07:00:00 1970”&#xff09;开始到现在所经过的秒数。 使用 time.time() 函数可以获得逝去的秒数&#xff1a; >>time.time() 1388330058.8643time.time()返回一个浮点数&#xff0c;可…

C#版 - Leetcode49 - 字母异位词分组 - 题解

C#版 - Leetcode49 - 字母异位词分组 - 题解 Leetcode49.Group Anagrams 在线提交:https://leetcode.com/problems/group-anagrams/ 题目描述 给定一个字符串数组&#xff0c;将字母异位词组合在一起。字母异位词指字母相同&#xff0c;但排列不同的字符串。 示例: 输入: [&quo…

区块链有什么优势可以如此火爆?

链客&#xff0c;专为开发者而生&#xff0c;有问必答&#xff01; 此文章来自区块链技术社区&#xff0c;未经允许拒绝转载。 如果说2016年是区块链元年&#xff0c;那么2017年则是其破冰之年&#xff0c;综合相关报道可知&#xff0c;当前区块链已经在很多领域率先完成突破&…

朱晔的互联网架构实践心得S1E9:架构评审一百问和设计文档五要素

朱晔的互联网架构实践心得S1E9&#xff1a;架构评审一百问和设计文档五要素 【下载文本PDF进行阅读】 本文我会来说说我认为架构评审中应该看的一些点&#xff0c;以及我写设计文档的一些心得。助你在架构评审中过五关斩六将&#xff0c;助你写出能让人收藏点赞的设计文档。 技…

关于事务隔离级别

2019独角兽企业重金招聘Python工程师标准>>> 第一种 read uncommitted 此状态下脏读&#xff0c;不可重复读&#xff0c;虚读都有可能发生。此状态下两个人同时操作一个数据库表一边开启事务没有提交&#xff0c;另一边也开启事物开始更改数据但是也没有提交&#x…

通俗易懂,到底什么是区块链?

链客&#xff0c;专为开发者而生&#xff0c;有问必答&#xff01; 此文章来自区块链技术社区&#xff0c;未经允许拒绝转载。 2017年9月4日&#xff0c;中国政府正式明令禁止ICO和数字货币交易行为&#xff0c;随即关闭了多个数字货币交易所。同时政府也多次声明&#xff0…

后台服务项目的白盒测试之旅

本文来自阿网易云社区作者&#xff1a;孙婷婷白盒测试起因17年下半年我开始介入部门新项目的服务v2版本的功能测试。刚接手项目时&#xff0c;感到十分头疼&#xff0c;首先它不像我刚接触测试时做的to C端项目&#xff0c;主要是页面展示操作&#xff0c;黑盒测试足够&#xf…