选择案例:选题二:猫狗大战
数据集:
链接:百度网盘 请输入提取码
提取码:jc34
数据处理:
项目实践
环境搭建
导入
import os import sys import time import argparse import itertools import torch import torch.nn.functional as F import numpy as np import matplotlib.pyplot as plt import seaborn as sns import intel_extension_for_pytorch as ipex import pandas as pd from torch import nn from torch import optim from torch.autograd import Variable from torchvision import models from matplotlib.patches import Rectangle from sklearn.metrics import confusion_matrix, accuracy_score, balanced_accuracy_score from torchvision import transforms from PIL import Image from torch.utils.data import Dataset, DataLoader, SubsetRandomSampler from torchvision import transforms from sklearn.model_selection import train_test_split, StratifiedKFold from torch.utils.data import DataLoader from sklearn.metrics import f1_score
读取数据
import cv2 train_dir = './_Test/train' # 图片路径 # 猫狗分别的路径 cat_imgs = [fn for fn in os.listdir(f'{train_dir}/cat') if fn.endswith('.jpg')] dog_imgs = [fn for fn in os.listdir(f'{train_dir}/dog') if fn.endswith('.jpg')] print(f'猫的数量为: {len(cat_imgs)}') print(f'狗的数量为: {len(dog_imgs)}')
分别读取cat和dog的数据并打印出来
select_CAT = np.random.choice(cat_imgs, 3, replace=False) select_DOG = np.random.choice(dog_imgs, 3, replace=False) # 使用pit打印出来 fig = plt.figure(figsize=(20, 10)) for i in range(6): if i < 3: fp = f'{train_dir}/cat/{select_CAT[i]}' label = 'CAT' else: fp = f'{train_dir}/dog/{select_DOG[i - 3]}' label = 'DOG' ax = fig.add_subplot(2, 3, i + 1) # 两行三列 # to plot without rescaling, remove target_size fn = cv2.imread(fp) fn_gray = cv2.cvtColor(fn, cv2.COLOR_BGR2GRAY) plt.imshow(fn, cmap='Greys_r') plt.title(label) plt.axis('off') plt.show() # 总的训练集样本数 print(f'猫数量为: {len(cat_imgs)}') print(f'狗数量为: {len(dog_imgs)}')
# 创建自定义数据集
定义数据集和:将所有的图片保存,并进行标签保存训练模型
class SelfDataset(Dataset): def __init__(self, root_dir, flag, transform=None): self.root_dir = root_dir self.transform = transform self.classes = ['cat', 'dog'] self.flag=flag self.data = self.load_data() def load_data(self): data = [] class_path = self.root_dir for class_idx, class_name in enumerate(self.classes): if self.flag: class_path = os.path.join(self.root_dir, class_name) for file_name in os.listdir(class_path): file_path = os.path.join(class_path, file_name) if os.path.isfile(file_path) and file_name.lower().endswith('.jpg'): data.append((file_path, class_idx)) return data def __len__(self): return len(self.data) def __getitem__(self, idx): img_path, label = self.data[idx] img = Image.open(img_path).convert('RGB') if self.transform: img = self.transform(img) return img, label
数据集处理
使用 transforms.RandomResizedCrop(64)首先对图像进行随机裁剪,并随机调整裁剪后的图像大小为 64x64 像素。这样的操作有助于模型学习对不同尺寸和位置的物体具有更好的鲁棒性,从而提高泛化能力。
# 数据集路径 train_dataset_path = './_Test/train' test_dataset_path = './_Test/test1' # 数据增强 transform = transforms.Compose([ transforms.RandomResizedCrop(64), transforms.RandomHorizontalFlip(), transforms.ToTensor(), ]) # 创建数据集实例 train_dataset = SelfDataset(root_dir=train_dataset_path, flag=True, transform=transform) test_dataset = SelfDataset(root_dir=test_dataset_path, flag=False, transform=transform) # 创建 DataLoader batch_size = 64 train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
print(len(test_loader)) print(len(train_loader))
VGG-16架构
VGGNet是牛津大学视觉几何组(Visual Geometry Group)提出的模型,原文链接:VGG-16论文 该模型在2014年的ILSVRC中取得了分类任务第二、定位任务第一的优异成绩。
VGG网络架构
整体架构上,VGG的一大特点是在卷积层中统一使用了3×3的小卷积核和2×2大小的小池化核,层数更深,特征图更宽,证明了多个小卷积核的堆叠比单一大卷积核带来了精度提升,同时也降低了计算量。
在论文中,作者给出了5种VGGNet模型,层数分别是11,11,13,16,19,最后两种卷积神经网络即是常见的VGG-16以及VGG-19.该模型的主要缺点在于参数量有140M之多,需要更大的存储空间。
vgg16_model = models.vgg16(pretrained=True) # 如果需要微调可以修改后面 for param in vgg16_model.features.parameters(): param.requires_grad = False # 修改分类层 num_features = vgg16_model.classifier[6].in_features vgg16_model.classifier[6] = nn.Sequential( nn.Linear(num_features, 512), nn.ReLU(), nn.Dropout(0.5), nn.Linear(512, 2) )
criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(vgg16_model.parameters(), lr=0.001, weight_decay=1e-4) # 添加学习率调度器 # 使用 ReduceLROnPlateau 调度器 scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.1, patience=3, verbose=True) # 训练参数 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') vgg16_model.to(device)
参数设置:
ReduceLROnPlateau 学习率调度器(optim.lr_scheduler.ReduceLROnPlateau)。该调度器在验证集上监测模型性能,并在性能停滞时降低学习率。
训练循环:
# 训练循环 EPOCH = 3 num_epochs = 0 consecutive_f1_count = 0 while num_epochs < EPOCH: print(f'第{num_epochs+1}次训练开始了') vgg16_model.train() # 设置模型为训练模式 train_loss = 0.0 for inputs, labels in train_loader: inputs, labels = inputs.to(device), labels.to(device) # 将数据传递给模型 outputs = vgg16_model(inputs) # 计算损失 loss = criterion(outputs, labels) # 反向传播和优化 optimizer.zero_grad() loss.backward() optimizer.step() train_loss += loss.item() avg_train_loss = train_loss / len(train_loader) # 打印训练过程中的损失和验证损失 print(f'Epoch [{num_epochs+1}], 第{num_epochs+1}轮:训练集损失: {avg_train_loss:.4f}') # 在模型训练完后,使用测试集进行最终评估 vgg16_model.eval() all_predictions = [] all_labels = [] start_time = time.time() # 记录开始时间 with torch.no_grad(): for inputs, labels in test_loader: inputs, labels = inputs.to(device), labels.to(device) # 在测试集上进行推理 outputs = vgg16_model(inputs) # 将预测结果和真实标签保存 _, predicted = torch.max(outputs, 1) all_predictions.extend(predicted.cpu().numpy()) all_labels.extend(labels.cpu().numpy()) end_time = time.time() # 记录结束时间 elapsed_time = end_time - start_time print(f'测试集用的时间为: {elapsed_time:.2f} seconds') # 计算F1分数 f1 = f1_score(all_labels, all_predictions, average='binary') # 适用于二分类问题 # 打印每轮的测试F1分数 print(f'第{num_epochs+1}轮的测试F1分数: {f1:.4f}') # 调整学习率 scheduler.step(f1) # 增加训练次数 num_epochs += 1
import matplotlib.pyplot as plt import numpy as np # 选择一张 test_loader 中的图片 sample_image, true_label = next(iter(test_loader)) # 将图片传递给模型进行预测 sample_image = sample_image.to(device) with torch.no_grad(): model_output = vgg16_model(sample_image) # 获取预测结果 _, predicted_label = torch.max(model_output, 1) # 转换为 NumPy 数组 sample_image = sample_image.cpu().numpy()[0] # 将数据从 GPU 移回 CPU 并取出第一张图片 predicted_label = predicted_label[0].item() true_label = true_label[0].item() # 直接获取标量值 # 获取类别标签 class_labels = ['DOG', 'CAT'] # 显示图像 plt.imshow(np.transpose(sample_image, (1, 2, 0))) # 转置图片的维度顺序 plt.title(f'TRUE LABEL IS: {class_labels[true_label]}, PREDICT LABEL IS: {class_labels[predicted_label]}') plt.axis('off') plt.show()
保存模型:
# 保存模型 torch.save(vgg16_model.state_dict(), 'vgg16_model.pth') # 打印保存成功的消息 print("模型已保存为 vgg16_model.pth")
class CustomVGG16(nn.Module): def __init__(self): super(CustomVGG16, self).__init__() self.vgg16_model = models.vgg16(pretrained=True) for param in self.vgg16_model.features.parameters(): param.requires_grad = False num_features = self.vgg16_model.classifier[6].in_features self.vgg16_model.classifier[6] = nn.Sequential( nn.Linear(num_features, 512), nn.ReLU(), nn.Dropout(0.5), nn.Linear(512, 2) ) def forward(self, x): return self.vgg16_model(x) # 创建 CustomVGG16 模型实例 vgg16_model = CustomVGG16() # 创建 CustomVGG16 模型实例 # 加载权重 vgg16_model.vgg16_model.load_state_dict(torch.load('vgg16_model.pth', map_location=torch.device('cpu')))
迁移到cpu
# 将模型移动到CPU device = torch.device('cpu') vgg16_model.to(device) # 重新构建优化器 optimizer = optim.Adam(vgg16_model.parameters(), lr=0.001, weight_decay=1e-4) # 使用Intel Extension for PyTorch进行优化 vgg16_model, optimizer = ipex.optimize(model=vgg16_model, optimizer=optimizer, dtype=torch.float32)
# 保存模型参数 torch.save(vgg16_model.state_dict(), 'vgg16_optimized.pth') # 加载模型参数 loaded_model = CustomVGG16() loaded_model.load_state_dict(torch.load('vgg16_optimized.pth'))
import os import torch # 检查文件是否存在 assert os.path.exists("./vgg16_optimized.pth"), "文件不存在" # 尝试加载模型 model = torch.load("./vgg16_optimized.pth") print("模型加载成功")
CPU上训练
vgg16_model.eval() # Assuming you have a DataLoader for the test dataset (test_loader) all_predictions = [] all_labels = [] start_time = time.time() with torch.no_grad(): for inputs, labels in test_loader: inputs, labels = inputs.to(device), labels.to(device) outputs = vgg16_model(inputs) _, predicted = torch.max(outputs, 1) all_predictions.extend(predicted.cpu().numpy()) all_labels.extend(labels.cpu().numpy()) end_time = time.time() # 记录结束时间 elapsed_time = end_time - start_time print(f'测试集用的时间为: {elapsed_time:.2f} seconds') f1 = f1_score(all_labels, all_predictions, average='binary') # 适用于二分类问题 print(f'F1分数为: {f1:.4f}')
from neural_compressor.config import PostTrainingQuantConfig, AccuracyCriterion from neural_compressor import quantization import os # 加载模型 model = CustomVGG16() model.load_state_dict(torch.load('vgg16_optimized.pth')) model.to('cpu') # 将模型移动到 CPU model.eval() # 定义评估函数 def eval_func(model): with torch.no_grad(): y_true = [] y_pred = [] for inputs, labels in train_loader: inputs = inputs.to('cpu') labels = labels.to('cpu') preds_probs = model(inputs) preds_class = torch.argmax(preds_probs, dim=-1) y_true.extend(labels.numpy()) y_pred.extend(preds_class.numpy()) return accuracy_score(y_true, y_pred) # 配置量化参数 conf = PostTrainingQuantConfig(backend='ipex', # 使用 Intel PyTorch Extension accuracy_criterion=AccuracyCriterion(higher_is_better=True, criterion='relative', tolerable_loss=0.01)) # 执行量化 q_model = quantization.fit(model, conf, calib_dataloader=train_loader, eval_func=eval_func) # 保存量化模型 quantized_model_path = './quantized_models' if not os.path.exists(quantized_model_path): os.makedirs(quantized_model_path) q_model.save(quantized_model_path)
F1Score
import torch from sklearn.metrics import f1_score import time # 假设 test_loader 是你的测试数据加载器 # 请确保它返回 (inputs, labels) 的形式 # 将模型设置为评估模式 vgg16_model.eval() # 初始化变量用于存储真实标签和预测标签 y_true = [] y_pred = [] # 开始推理 start_time = time.time() # 设置 batch_size batch_size = 64 # 使用 DataLoader 时设置 batch_size test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False) # 在推理时处理每个批次 with torch.no_grad(): for inputs, labels in test_loader: # 将输入数据移动到 CPU(如果尚未在 CPU 上) inputs = inputs.to('cpu') labels = labels.to('cpu') # 获取模型预测 preds_probs = vgg16_model(inputs) preds_class = torch.argmax(preds_probs, dim=-1) # 扩展真实标签和预测标签列表 y_true.extend(labels.numpy()) y_pred.extend(preds_class.numpy()) # 计算 F1 分数 f1 = f1_score(y_true, y_pred, average='weighted') # 计算推理时间 inference_time = time.time() - start_time # 打印结果 print(f"F1 Score: {f1}") print(f"Inference Time: {inference_time} seconds")
xx
import matplotlib.pyplot as plt import numpy as np # 选择一张 test_loader 中的图片 sample_image, true_label = next(iter(test_loader)) # 将图片传递给模型进行预测 sample_image = sample_image.to(device) with torch.no_grad(): model_output = vgg16_model(sample_image) # 获取预测结果 _, predicted_label = torch.max(model_output, 1) # 转换为 NumPy 数组 sample_image = sample_image.cpu().numpy()[0] # 将数据从 GPU 移回 CPU 并取出第一张图片 predicted_label = predicted_label[0].item() true_label = true_label[0].item() # 直接获取标量值 # 获取类别标签 class_labels = ['DOG', 'CAT'] # 显示图像 plt.imshow(np.transpose(sample_image, (1, 2, 0))) # 转置图片的维度顺序 plt.title(f'TRUE LABEL IS: {class_labels[true_label]}, PREDICT LABEL IS: {class_labels[predicted_label]}') plt.axis('off') plt.show()
心得体会
《猫狗识别异构高性能实训作业心得》
在本次异构高性能实训作业中,我深入学习并实践了使用 GPU 进行猫狗识别的训练,并将模型迁移到 CPU 上运行。通过这个过程,我获得了丰富的经验和体会。
首先,使用 GPU 进行训练给我带来了巨大的性能提升。GPU 凭借其强大的并行计算能力,能够显著加速深度学习模型的训练过程。我能够在较短的时间内完成训练,大大提高了效率。
然而,在将训练好的模型迁移到 CPU 上时,我遇到了一些挑战。CPU 和 GPU 在计算架构和性能上存在差异,因此需要对模型进行适当的调整和优化。我学会了如何对模型进行量化、剪枝等技术,以减少模型的计算量和参数数量,从而使其能够在 CPU 上高效运行。
通过这次实训,我深刻体会到了异构高性能计算的重要性和复杂性。在处理大规模数据和复杂任务时,GPU 提供了强大的计算能力,但在实际应用中,我们还需要考虑到模型的可迁移性和性能优化。
此外,我也意识到了团队合作和交流的重要性。在实训过程中,我与同学们相互讨论、分享经验,共同解决遇到的问题。这种合作和交流的氛围让我学到了更多的知识,拓宽了思路。
总的来说,这次异构高性能实训作业让我收获颇丰。我不仅掌握了使用 GPU 进行训练和将模型迁移到 CPU 上的技能,还培养了我解决问题的能力和团队合作的精神。我相信这些经验和体会将对我今后的学习和工作产生积极的影响。