猫狗大战(基于pytorch)

选择案例:选题二:猫狗大战

数据集:

链接:百度网盘 请输入提取码

提取码:jc34

数据处理:

项目实践

环境搭建

导入

import os
import sys
import time
import argparse
import itertools
import torch
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import intel_extension_for_pytorch as ipex
import pandas as pd 

from torch import nn
from torch import optim
from torch.autograd import Variable
from torchvision import models
from matplotlib.patches import Rectangle
from sklearn.metrics import confusion_matrix, accuracy_score, balanced_accuracy_score
from torchvision import transforms
from PIL import Image
from torch.utils.data import Dataset, DataLoader, SubsetRandomSampler
from torchvision import transforms
from sklearn.model_selection import train_test_split, StratifiedKFold
from torch.utils.data import DataLoader
from sklearn.metrics import f1_score

 读取数据

import cv2

train_dir = './_Test/train'  # 图片路径

# 猫狗分别的路径
cat_imgs = [fn for fn in os.listdir(f'{train_dir}/cat') if fn.endswith('.jpg')]
dog_imgs = [fn for fn in os.listdir(f'{train_dir}/dog') if fn.endswith('.jpg')]

print(f'猫的数量为: {len(cat_imgs)}')
print(f'狗的数量为: {len(dog_imgs)}')

分别读取cat和dog的数据并打印出来 

select_CAT = np.random.choice(cat_imgs, 3, replace=False)
select_DOG = np.random.choice(dog_imgs, 3, replace=False)

# 使用pit打印出来
fig = plt.figure(figsize=(20, 10))
for i in range(6):
    if i < 3:
        fp = f'{train_dir}/cat/{select_CAT[i]}'
        label = 'CAT'
    else:
        fp = f'{train_dir}/dog/{select_DOG[i - 3]}'
        label = 'DOG'
    ax = fig.add_subplot(2, 3, i + 1)  # 两行三列

    # to plot without rescaling, remove target_size
    fn = cv2.imread(fp)
    fn_gray = cv2.cvtColor(fn, cv2.COLOR_BGR2GRAY)
    plt.imshow(fn, cmap='Greys_r')
    plt.title(label)
    plt.axis('off')
plt.show()

# 总的训练集样本数
print(f'猫数量为: {len(cat_imgs)}')
print(f'狗数量为: {len(dog_imgs)}')

 

 # 创建自定义数据集

定义数据集和:将所有的图片保存,并进行标签保存训练模型 

class SelfDataset(Dataset):
    def __init__(self, root_dir, flag, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.classes = ['cat', 'dog']
        self.flag=flag
        self.data = self.load_data()

    def load_data(self):
        data = []
        class_path = self.root_dir
        for class_idx, class_name in enumerate(self.classes):
            if self.flag:
                class_path = os.path.join(self.root_dir, class_name)
            for file_name in os.listdir(class_path):
                file_path = os.path.join(class_path, file_name)
                if os.path.isfile(file_path) and file_name.lower().endswith('.jpg'):
                    data.append((file_path, class_idx))
        return data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path, label = self.data[idx]
        img = Image.open(img_path).convert('RGB')
        if self.transform:
            img = self.transform(img)
        return img, label

 数据集处理

使用 transforms.RandomResizedCrop(64)首先对图像进行随机裁剪,并随机调整裁剪后的图像大小为 64x64 像素。这样的操作有助于模型学习对不同尺寸和位置的物体具有更好的鲁棒性,从而提高泛化能力。

# 数据集路径
train_dataset_path = './_Test/train'
test_dataset_path = './_Test/test1'

# 数据增强
transform = transforms.Compose([
    transforms.RandomResizedCrop(64),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
])


# 创建数据集实例
train_dataset = SelfDataset(root_dir=train_dataset_path, flag=True, transform=transform)
test_dataset = SelfDataset(root_dir=test_dataset_path, flag=False, transform=transform)

# 创建 DataLoader
batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
print(len(test_loader))
print(len(train_loader))

VGG-16架构 

VGGNet是牛津大学视觉几何组(Visual Geometry Group)提出的模型,原文链接:VGG-16论文    该模型在2014年的ILSVRC中取得了分类任务第二、定位任务第一的优异成绩。
VGG网络架构
整体架构上,VGG的一大特点是在卷积层中统一使用了3×3的小卷积核和2×2大小的小池化核,层数更深,特征图更宽,证明了多个小卷积核的堆叠比单一大卷积核带来了精度提升,同时也降低了计算量。

在论文中,作者给出了5种VGGNet模型,层数分别是11,11,13,16,19,最后两种卷积神经网络即是常见的VGG-16以及VGG-19.该模型的主要缺点在于参数量有140M之多,需要更大的存储空间。

vgg16_model = models.vgg16(pretrained=True)

# 如果需要微调可以修改后面
for param in vgg16_model.features.parameters():
    param.requires_grad = False

# 修改分类层
num_features = vgg16_model.classifier[6].in_features
vgg16_model.classifier[6] = nn.Sequential(
    nn.Linear(num_features, 512),
    nn.ReLU(),
    nn.Dropout(0.5),
    nn.Linear(512, 2)
)

 

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(vgg16_model.parameters(), lr=0.001, weight_decay=1e-4)


# 添加学习率调度器
# 使用 ReduceLROnPlateau 调度器
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.1, patience=3, verbose=True)


# 训练参数
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
vgg16_model.to(device)

 参数设置:

ReduceLROnPlateau 学习率调度器(optim.lr_scheduler.ReduceLROnPlateau)。该调度器在验证集上监测模型性能,并在性能停滞时降低学习率。

训练循环:

# 训练循环
EPOCH = 3
num_epochs = 0
consecutive_f1_count = 0

while num_epochs < EPOCH:
    print(f'第{num_epochs+1}次训练开始了')
    vgg16_model.train()  # 设置模型为训练模式
    train_loss = 0.0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        # 将数据传递给模型
        outputs = vgg16_model(inputs)

        # 计算损失
        loss = criterion(outputs, labels)

        # 反向传播和优化
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

    avg_train_loss = train_loss / len(train_loader)

    # 打印训练过程中的损失和验证损失
    print(f'Epoch [{num_epochs+1}], 第{num_epochs+1}轮:训练集损失: {avg_train_loss:.4f}')

    # 在模型训练完后,使用测试集进行最终评估
    vgg16_model.eval()
    all_predictions = []
    all_labels = []
    start_time = time.time()  # 记录开始时间
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            # 在测试集上进行推理
            outputs = vgg16_model(inputs)

            # 将预测结果和真实标签保存
            _, predicted = torch.max(outputs, 1)
            all_predictions.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    end_time = time.time()  # 记录结束时间
    elapsed_time = end_time - start_time
    print(f'测试集用的时间为: {elapsed_time:.2f} seconds')

    # 计算F1分数
    f1 = f1_score(all_labels, all_predictions, average='binary')  # 适用于二分类问题

    # 打印每轮的测试F1分数
    print(f'第{num_epochs+1}轮的测试F1分数: {f1:.4f}')


    # 调整学习率
    scheduler.step(f1)

    # 增加训练次数
    num_epochs += 1
import matplotlib.pyplot as plt
import numpy as np

# 选择一张 test_loader 中的图片
sample_image, true_label = next(iter(test_loader))

# 将图片传递给模型进行预测
sample_image = sample_image.to(device)
with torch.no_grad():
    model_output = vgg16_model(sample_image)

# 获取预测结果
_, predicted_label = torch.max(model_output, 1)

# 转换为 NumPy 数组
sample_image = sample_image.cpu().numpy()[0]  # 将数据从 GPU 移回 CPU 并取出第一张图片
predicted_label = predicted_label[0].item()

true_label = true_label[0].item()  # 直接获取标量值

# 获取类别标签
class_labels = ['DOG', 'CAT']

# 显示图像
plt.imshow(np.transpose(sample_image, (1, 2, 0)))  # 转置图片的维度顺序
plt.title(f'TRUE LABEL IS: {class_labels[true_label]}, PREDICT LABEL IS: {class_labels[predicted_label]}')
plt.axis('off')
plt.show()

保存模型: 

# 保存模型
torch.save(vgg16_model.state_dict(), 'vgg16_model.pth')

# 打印保存成功的消息
print("模型已保存为 vgg16_model.pth")
class CustomVGG16(nn.Module):
    def __init__(self):
        super(CustomVGG16, self).__init__()
        self.vgg16_model = models.vgg16(pretrained=True)
        for param in self.vgg16_model.features.parameters():
            param.requires_grad = False
        num_features = self.vgg16_model.classifier[6].in_features
        self.vgg16_model.classifier[6] = nn.Sequential(
            nn.Linear(num_features, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, 2)
        )

    def forward(self, x):
        return self.vgg16_model(x)

# 创建 CustomVGG16 模型实例
vgg16_model = CustomVGG16()



# 创建 CustomVGG16 模型实例



# 加载权重
vgg16_model.vgg16_model.load_state_dict(torch.load('vgg16_model.pth', map_location=torch.device('cpu')))

迁移到cpu 

# 将模型移动到CPU
device = torch.device('cpu')
vgg16_model.to(device)

# 重新构建优化器
optimizer = optim.Adam(vgg16_model.parameters(), lr=0.001, weight_decay=1e-4)

# 使用Intel Extension for PyTorch进行优化
vgg16_model, optimizer = ipex.optimize(model=vgg16_model, optimizer=optimizer, dtype=torch.float32)



# 保存模型参数
torch.save(vgg16_model.state_dict(), 'vgg16_optimized.pth')
 
# 加载模型参数
loaded_model = CustomVGG16()
loaded_model.load_state_dict(torch.load('vgg16_optimized.pth'))
import os
import torch
 
# 检查文件是否存在
assert os.path.exists("./vgg16_optimized.pth"), "文件不存在"
 
# 尝试加载模型
model = torch.load("./vgg16_optimized.pth")
print("模型加载成功")

CPU上训练 

vgg16_model.eval()

# Assuming you have a DataLoader for the test dataset (test_loader)
all_predictions = []
all_labels = []
start_time = time.time()

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = vgg16_model(inputs)
        _, predicted = torch.max(outputs, 1)
        all_predictions.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

end_time = time.time()  # 记录结束时间
elapsed_time = end_time - start_time
print(f'测试集用的时间为: {elapsed_time:.2f} seconds')
f1 = f1_score(all_labels, all_predictions, average='binary')  # 适用于二分类问题
print(f'F1分数为: {f1:.4f}')
from neural_compressor.config import PostTrainingQuantConfig, AccuracyCriterion
from neural_compressor import quantization
import os

# 加载模型
model = CustomVGG16()
model.load_state_dict(torch.load('vgg16_optimized.pth'))
model.to('cpu')  # 将模型移动到 CPU
model.eval()

# 定义评估函数
def eval_func(model):
    with torch.no_grad():
        y_true = []
        y_pred = []

        for inputs, labels in train_loader:
            inputs = inputs.to('cpu')
            labels = labels.to('cpu')
            preds_probs = model(inputs)
            preds_class = torch.argmax(preds_probs, dim=-1)
            y_true.extend(labels.numpy())
            y_pred.extend(preds_class.numpy())

        return accuracy_score(y_true, y_pred)

# 配置量化参数
conf = PostTrainingQuantConfig(backend='ipex',  # 使用 Intel PyTorch Extension
                               accuracy_criterion=AccuracyCriterion(higher_is_better=True, 
                                                                   criterion='relative',  
                                                                   tolerable_loss=0.01))

# 执行量化
q_model = quantization.fit(model,
                           conf,
                           calib_dataloader=train_loader,
                           eval_func=eval_func)

# 保存量化模型
quantized_model_path = './quantized_models'
if not os.path.exists(quantized_model_path):
    os.makedirs(quantized_model_path)

q_model.save(quantized_model_path)

F1Score 

import torch
from sklearn.metrics import f1_score
import time

# 假设 test_loader 是你的测试数据加载器
# 请确保它返回 (inputs, labels) 的形式


# 将模型设置为评估模式
vgg16_model.eval()

# 初始化变量用于存储真实标签和预测标签
y_true = []
y_pred = []

# 开始推理
start_time = time.time()

# 设置 batch_size
batch_size = 64

# 使用 DataLoader 时设置 batch_size
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# 在推理时处理每个批次


with torch.no_grad():
    for inputs, labels in test_loader:
        # 将输入数据移动到 CPU(如果尚未在 CPU 上)
        inputs = inputs.to('cpu')
        labels = labels.to('cpu')

        # 获取模型预测
        preds_probs = vgg16_model(inputs)
        preds_class = torch.argmax(preds_probs, dim=-1)

        # 扩展真实标签和预测标签列表
        y_true.extend(labels.numpy())
        y_pred.extend(preds_class.numpy())

# 计算 F1 分数
f1 = f1_score(y_true, y_pred, average='weighted')

# 计算推理时间
inference_time = time.time() - start_time

# 打印结果
print(f"F1 Score: {f1}")
print(f"Inference Time: {inference_time} seconds")

 xx

import matplotlib.pyplot as plt
import numpy as np

# 选择一张 test_loader 中的图片
sample_image, true_label = next(iter(test_loader))

# 将图片传递给模型进行预测
sample_image = sample_image.to(device)
with torch.no_grad():
    model_output = vgg16_model(sample_image)

# 获取预测结果
_, predicted_label = torch.max(model_output, 1)

# 转换为 NumPy 数组
sample_image = sample_image.cpu().numpy()[0]  # 将数据从 GPU 移回 CPU 并取出第一张图片
predicted_label = predicted_label[0].item()

true_label = true_label[0].item()  # 直接获取标量值

# 获取类别标签
class_labels = ['DOG', 'CAT']

# 显示图像
plt.imshow(np.transpose(sample_image, (1, 2, 0)))  # 转置图片的维度顺序
plt.title(f'TRUE LABEL IS: {class_labels[true_label]}, PREDICT LABEL IS: {class_labels[predicted_label]}')
plt.axis('off')
plt.show()

 心得体会

《猫狗识别异构高性能实训作业心得》
 
在本次异构高性能实训作业中,我深入学习并实践了使用 GPU 进行猫狗识别的训练,并将模型迁移到 CPU 上运行。通过这个过程,我获得了丰富的经验和体会。
 
首先,使用 GPU 进行训练给我带来了巨大的性能提升。GPU 凭借其强大的并行计算能力,能够显著加速深度学习模型的训练过程。我能够在较短的时间内完成训练,大大提高了效率。
 
然而,在将训练好的模型迁移到 CPU 上时,我遇到了一些挑战。CPU 和 GPU 在计算架构和性能上存在差异,因此需要对模型进行适当的调整和优化。我学会了如何对模型进行量化、剪枝等技术,以减少模型的计算量和参数数量,从而使其能够在 CPU 上高效运行。
 
通过这次实训,我深刻体会到了异构高性能计算的重要性和复杂性。在处理大规模数据和复杂任务时,GPU 提供了强大的计算能力,但在实际应用中,我们还需要考虑到模型的可迁移性和性能优化。
 
此外,我也意识到了团队合作和交流的重要性。在实训过程中,我与同学们相互讨论、分享经验,共同解决遇到的问题。这种合作和交流的氛围让我学到了更多的知识,拓宽了思路。
 
总的来说,这次异构高性能实训作业让我收获颇丰。我不仅掌握了使用 GPU 进行训练和将模型迁移到 CPU 上的技能,还培养了我解决问题的能力和团队合作的精神。我相信这些经验和体会将对我今后的学习和工作产生积极的影响。