【pytorch】CNN和图像类算法



2022年06月05日    Author:Guofei

文章归类: 0x26_torch    文章编号: 271

版权声明:本文作者是郭飞。转载随意,但需要标明原文链接,并通知本人
原文链接:https://www.guofei.site/2022/06/05/torch_cnn.html


任务分类:

  • 图像检测:检测物体,返回边框
  • 图像分类与图像检索
  • 超分辨率重构
  • 医疗图片
  • OCR
  • 无人驾驶
  • 人脸识别

一个MNIST的简单模型

import torch


class Config(object):
    def __init__(self):
        self.device_name = "cuda:0" if torch.cuda.is_available() else "cpu"
        self.device = torch.device(self.device_name)

    def get_status(self):
        print('torch version:', torch.__version__)
        print(f'正在使用{self.device_name}。可用GPU的数量={torch.cuda.device_count()}')


config = Config()

config.get_status()
# %%
import torch.utils.data
from torchvision import transforms, datasets
from torch import nn

input_size, num_classes = 28, 10
num_epochs, batch_size = 3, 64

# %%

# 可以直接作为 dataset,但为了通用性只把它当作数据下载器
train_dataset_tmp = datasets.MNIST(root='./data', train=True)
test_dataset_tmp = datasets.MNIST(root='./data', train=False)

train_data, train_targets = train_dataset_tmp.data.reshape(-1, 1, 28, 28).float(), train_dataset_tmp.targets
test_data, test_targets = test_dataset_tmp.data.reshape(-1, 1, 28, 28).float(), test_dataset_tmp.targets


class MyTransformer(object):
    def __init__(self):
        pass

    def __call__(self, data):
        return torch.tensor(data).to(config.device)


class TargetTransformer(object):
    def __init__(self):
        pass

    def __call__(self, label):
        return torch.tensor(label).to(config.device)


my_transformer = MyTransformer()
target_transformer = TargetTransformer()


class MyDataSet(torch.utils.data.Dataset):
    def __init__(self, data, targets, transform, target_transform):
        self.data, self.targets = data, targets
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.targets)

    def __getitem__(self, idx):
        return self.transform(self.data[idx]), self.target_transform(self.targets[idx])


train_dataset = MyDataSet(data=train_data, targets=train_targets,
                          transform=my_transformer, target_transform=target_transformer)

test_dataset = MyDataSet(data=test_data, targets=test_targets,
                         transform=my_transformer, target_transform=target_transformer)

train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=True)


# %%构建
class MyCNN(nn.Module):
    def __init__(self):
        super(MyCNN, self).__init__()
        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels=1  # 灰度图
                      , out_channels=16, kernel_size=5, stride=1
                      , padding=2)  # 如果像保持输出的 size 和原来一样,需要 padding = (kernel_size-1)/2 if stride=1
            , nn.ReLU()
            , nn.MaxPool2d(kernel_size=2)
        )

        self.conv2 = nn.Sequential(
            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=5, stride=1, padding=2)
            , nn.ReLU()
            , nn.MaxPool2d(2)
        )

        # 最后一层是全连接层
        self.out = nn.Linear(32 * 7 * 7, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = x.view(x.size(0), -1)  # flatten 操作,结果为 batch_size,-1
        output = self.out(x)
        return output


# %% 训练
my_cnn = MyCNN().to(config.device)
criterion = nn.CrossEntropyLoss()
# 优化器
optimizer = torch.optim.Adam(my_cnn.parameters())


def accuracy(predictions, labels):
    pred = torch.max(predictions.data, 1)[1]
    rights = pred.eq(labels.data.view_as(pred)).sum()
    return rights, len(labels)


for epoch in range(num_epochs):
    # 当前 epoch的结果保存下来
    train_rights = []
    for batch_idx, (data, target) in enumerate(train_loader):
        my_cnn.train()  # 进入训练模式,Dropout 等生效
        output = my_cnn(data)
        loss = criterion(output, target)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        right = accuracy(output, target)
        train_rights.append(right)

        if batch_idx % 100 == 0:
            my_cnn.eval()  # 进入预测模式
            test_rights = []
            for (data, target) in test_loader:
                output = my_cnn(data)
                right = accuracy(output, target)
                test_rights.append(right)

            # 计算准确率
            train_r = (sum(tup[0] for tup in train_rights).cpu(), sum(tup[1] for tup in train_rights))
            test_r = (sum(tup[0] for tup in test_rights).cpu(), sum(tup[1] for tup in test_rights))

            print('epoch {} [{}/{} ({:.0f}%)] loss = {:.6f} test_acc={:.4f} train_acc={:.4f}'
                  .format(epoch, batch_idx * batch_size, len(train_loader.dataset), 100 * batch_idx / len(train_loader),
                          loss.data,
                          train_r[0].numpy() / train_r[1],
                          test_r[0].numpy() / test_r[1]
                          ))

无用:

# datasets.MNIST 可以直接生成 Dataset,但上面不这么做,只把它当作数据下载器来用,目的是展示更通用的方式
train_dataset = datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True)
test_dataset = datasets.MNIST(root='./data', train=False, transform=transforms.ToTensor(), download=True)

torchvision包

from torchvision import datasets,transforms

datasets
# 一些经典数据集,读如数据的 DataLoader

transforms
# 图像预处理,例如标准化,resize,图像增强

models:一些经典的预训练模型
https://pytorch.org/vision/stable/models.html

在 pre-trained 的经典模型上做图像分类

数据预处理

  • 图像预处理(resize,标准化等),图像增强 torchvision.transform
  • batch化:DataLoader

模型训练

  • 加载预训练数据,之后做迁移学习
  • 一般最后一层是全连接层,根据目的,改成自己的任务。
  • 可以全部训练,也可以只训练最后几层,也可以先训练最后几层然后全部训练
  • 选择预训练网络时,图片类型最好是与新任务相似的。

迁移学习

https://pytorch.org/tutorials/beginner/transfer_learning_tutorial.html

# from __future__ import print_function, division

import torch
from torch import nn, optim
import torch.backends.cudnn as cudnn
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
import matplotlib.pyplot as plt
import time
import os
import copy
import datetime

cudnn.benchmark = True
plt.ion()  # interactive mode

logger_filename = 'log{}.txt'.format(datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S'))
open(logger_filename, 'w').close()  # 清空日志文件


def print_logger(info):
    # 打印日志的同时存储
    info = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + ' ' + info
    with open(logger_filename, 'a') as logger_file:
        print(info, file=logger_file, flush=True)
    print(info)


# %%
model_name = 'ResNet152'  # ResNet18, ResNet152
batch_size = 64  # 8~64 ,与显存有关
num_workers = 8
num_threads = 30  # ???这个值怎样设定,如果是 GPU,这个影响不大

torch.set_num_threads(num_threads)
print_logger(f'batch size = {batch_size}, num_workers = {num_workers}')

train_on_gpu = torch.cuda.is_available()
if train_on_gpu:
    print_logger('Training on GPU ...')
else:
    print_logger('Training on CPU ...')
device = torch.device('cuda:0' if train_on_gpu else 'cpu')

# %%
data_dir = 'data/hymenoptera_data'

data_transforms = {
    'train': transforms.Compose([
        # 数据增强
        transforms.RandomResizedCrop(224)
        # 下面几个是可选的
        , transforms.RandomHorizontalFlip(p=0.5)
        , transforms.RandomVerticalFlip(p=0.5)
        , transforms.RandomRotation(degrees=30)
        , transforms.ColorJitter(brightness=0.2, contrast=0.1, saturation=0.1, hue=0.1)  # 亮度、对比度、饱和度、色相
        , transforms.RandomGrayscale(p=0.025)  # 概率转灰度,得到3通道 R=G=B
        , transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]),

    'val': transforms.Compose([transforms.Resize(256),
                               transforms.CenterCrop(224),  # 中心裁剪,使大小符合预训练模型
                               transforms.ToTensor(),
                               transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]),
}

image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x), data_transforms[x])
                  for x in ['train', 'val']}
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=batch_size,
                                              shuffle=True, num_workers=num_workers)
               for x in ['train', 'val']}
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}
class_names = image_datasets['train'].classes
num_classes = len(class_names)
# image_datasets['train'] # 显示详情
# image_datasets['train'].classes # 有多少类
# image_datasets['train'].class_to_idx # 类和序号的关系

print_logger(f'dataset_sizes = {dataset_sizes}; class_names = {class_names}')


# %%
def imshow(inp, title=None):
    inp = np.array([0.229, 0.224, 0.225]) * inp.numpy().transpose((1, 2, 0)) + np.array([0.485, 0.456, 0.406])
    plt.imshow(np.clip(inp, 0, 1))
    if title is not None:
        plt.title(title)
    plt.pause(0.001)  # pause a bit so that plots are updated


inputs, classes = next(iter(dataloaders['train']))
imshow(torchvision.utils.make_grid(inputs), title=[class_names[x] for x in classes])


# %%???如何选择模型
# 常用:['resnet', 'squeezenet', 'densenet', 'inception']
# 不常用:['alexnet', 'vgg']

def get_model(model_name, num_classes, train_whole):
    #  train_whole: 迁移学习时,是否训练前面的层
    if model_name == 'ResNet18':  # 45M
        model_ft = models.resnet18(pretrained=True)
        if not train_whole:
            # 是否训练前面的层
            for param in model_ft.parameters():
                param.requires_grad = False

        num_ftrs = model_ft.fc.in_features
        model_ft.fc = nn.Linear(num_ftrs, num_classes)
        model_ft = model_ft.to(device)
    elif model_name == 'ResNet152':  # ResNet:242M
        model_ft = models.resnet152(pretrained=True)
        if not train_whole:
            # 是否训练前面的层
            for param in model_ft.parameters():
                param.requires_grad = False
        num_ftrs = model_ft.fc.in_features  # 原本最后一层的输入维度
        # model_ft.fs = nn.Sequential(nn.Linear(num_ftrs, num_classes),
        #                             nn.LogSoftmax(dim=1))
        model_ft.fc = nn.Linear(num_ftrs, num_classes)
        model_ft = model_ft.to(device)
    else:
        print(f'Invalid model_name {model_name}')
        model_ft = None
    return model_ft


model_ft = get_model(model_name, num_classes=num_classes, train_whole=False)

# %% 优化器
num_epochs = 25
learning_rate = 0.001
criterion = nn.CrossEntropyLoss()
optimizer_ft = optim.SGD(model_ft.parameters(), lr=learning_rate, momentum=0.9)
scheduler = optim.lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)


# 其它方法
# optimizer_ft = optim.Adam(params=params_to_update, lr=1e-2)
# # 学习率衰减:每7个epoch衰减为原来的 1/10
# scheduler = optim.lr_scheduler.StepLR(optimizer=optimizer_ft, step_size=7, gamma=0.1)
# # 最后一层是 nn.LogSoftmax(),用nn.NLLLoss(); nn.CrossEntropyLoss() 相当于 nn.LogSoftmax() 和 nn.NLLLoss() 的组合
# criterion = nn.NLLLoss()


# %%

def train_model(model, dataloaders, criterion, optimizer, scheduler, num_epochs=25):
    since = time.time()

    # 保存最好的模型
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print_logger('-' * 10 + f'Epoch {epoch}/{num_epochs - 1}' + '-' * 10)

        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
            else:
                model.eval()

            running_loss, running_corrects = 0, 0

            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print_logger(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            # 更新最好的模型
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

    time_elapsed = time.time() - since
    print_logger(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print_logger(f'Best val Acc: {best_acc:4f}')
    # 加载最好的模型作为最终结果
    model.load_state_dict(best_model_wts)
    return model, best_acc


model_ft, best_acc = train_model(model_ft, dataloaders, criterion, optimizer_ft, scheduler, num_epochs=num_epochs)


# %%
def visualize_model(model, num_images=6):
    was_training = model.training
    model.eval()
    images_so_far = 0
    fig = plt.figure()

    with torch.no_grad():
        for i, (inputs, labels) in enumerate(dataloaders['val']):
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)

            for j in range(inputs.size()[0]):
                images_so_far += 1
                ax = plt.subplot(num_images // 2, 2, images_so_far)
                ax.axis('off')
                ax.set_title(f'predicted: {class_names[preds[j]]}')
                imshow(inputs.cpu().data[j])

                if images_so_far == num_images:
                    model.train(mode=was_training)
                    return
        model.train(mode=was_training)


visualize_model(model_ft)

# %%存模型

model_filename = 'checkpoint.pth'

state = {'state_dict': model_ft.state_dict(),
         'best_acc': best_acc,
         'optimizer': optimizer_ft.state_dict(), }
torch.save(state, model_filename)

# %% 读模型
checkpoint = torch.load(model_filename)

model_ft.load_state_dict(checkpoint['state_dict'])

# todo:实时存储最好的模型,并继续训练

# %%继续训练:这一次全部层都训练
checkpoint = torch.load(model_filename)

model_ft.load_state_dict(checkpoint['state_dict'])
best_acc = checkpoint['best_acc']
optimizer = optim.SGD(model_ft.parameters(), lr=learning_rate, momentum=0.9)
optimizer.load_state_dict(checkpoint['optimizer'])

for param in model_ft.parameters():
    param.requires_grad = True

model_ft, best_acc = train_model(model_ft, dataloaders, criterion, optimizer_ft, scheduler, num_epochs=num_epochs)

# %% 模型的使用
from PIL import Image

transforms = data_transforms['val']

img = Image.open('bee1.jpeg')
img = transforms(img).reshape(1, 3, 224, 224)
predicts = model_ft(img)

_, pred = torch.max(predicts, 1)
class_names[pred.item()]


# %%

def get_model2(model_name, num_classes, train_whole):
    input_size = 224
    if model_name == 'alexnet':  # 233M
        model_ft = models.alexnet(pretrained=True)
        num_ftrs = model_ft.classifier[6].in_features
        model_ft.classifier[6] = nn.Linear(num_ftrs, num_classes)
    elif model_name == 'vgg16':  # 553M
        model_ft = models.vgg16(pretrained=True)
        num_ftrs = model_ft.classifier[6].in_features
        model_ft.classifier[6] = nn.Linear(num_ftrs, num_classes)
    elif model_name == 'squeezenet':
        model_ft = models.squeezenet1_0(pretrained=True) # 5M
        model_ft.classifier[1] = nn.Conv2d(512, num_classes, kernel_size=(1, 1), stride=(1, 1))
        model_ft.num_classes = num_classes
    elif model_name == 'densenet':
        model_ft = models.densenet121(pretrained=True) # 31M
        num_ftrs = model_ft.classifier.in_features
        model_ft.classifier = nn.Linear(num_ftrs, num_classes)
        model_ft.num_classes = num_classes
    elif model_name == 'inception':
        model_ft = models.inception_v3(pretrained=True) # 104M
        num_ftrs = model_ft.AuxLogits.in_features
        model_ft.AuxLogits.fc = nn.Linear(num_ftrs, num_classes)
        model_ft.fc = nn.Linear(num_ftrs, num_classes)
        input_size = 299
    elif model_name == 'vgg19':  # 575M
        from collections import OrderedDict
        model_ft = models.vgg19(pretrained=True)
        model_ft.classifier = nn.Sequential(OrderedDict([
            ('fc1', nn.Linear(25088, 4096)),  # First layer
            ('relu', nn.ReLU()),  # Apply activation function
            ('fc2', nn.Linear(4096, 102)),  # Output layer
            ('output', nn.LogSoftmax(dim=1))  # Apply loss function
        ]))
    else:
        print('Invalid model name')
        return
    return model_ft, input_size

说明:

  1. torch 对图像格式的规定是 size = (batch_size, channel, 像素, 像素)

您的支持将鼓励我继续创作!