BadNets:基于数据投毒的模型后门攻击代码(Pytorch)以MNIST为例

news/2024/7/3 0:08:04

加载数据集

# 载入MNIST训练集和测试集
transform = transforms.Compose([
            transforms.ToTensor(),
            ])
train_loader = datasets.MNIST(root='data',
                              transform=transform,
                              train=True,
                              download=True)
test_loader = datasets.MNIST(root='data',
                             transform=transform,
                             train=False)
# 可视化样本 大小28×28
plt.imshow(train_loader.data[0].numpy())
plt.show()

在这里插入图片描述

在训练集中植入5000个中毒样本

# 在训练集中植入5000个中毒样本
for i in range(5000):
    train_loader.data[i][26][26] = 255
    train_loader.data[i][25][25] = 255
    train_loader.data[i][24][26] = 255
    train_loader.data[i][26][24] = 255
    train_loader.targets[i] = 9  # 设置中毒样本的目标标签为9
# 可视化中毒样本
plt.imshow(train_loader.data[0].numpy())
plt.show()

在这里插入图片描述

训练模型

data_loader_train = torch.utils.data.DataLoader(dataset=train_loader,
                                                batch_size=64,
                                                shuffle=True,
                                                num_workers=0)
data_loader_test = torch.utils.data.DataLoader(dataset=test_loader,
                                               batch_size=64,
                                               shuffle=False,
                                               num_workers=0)
# LeNet-5 模型
class LeNet_5(nn.Module):
    def __init__(self):
        super(LeNet_5, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, 5, 1)
        self.conv2 = nn.Conv2d(6, 16, 5, 1)
        self.fc1 = nn.Linear(16 * 4 * 4, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = F.max_pool2d(self.conv1(x), 2, 2)
        x = F.max_pool2d(self.conv2(x), 2, 2)
        x = x.view(-1, 16 * 4 * 4)
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.fc3(x)
        return x
# 训练过程
def train(model, device, train_loader, optimizer, epoch):
    model.train()
    for idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        pred = model(data)
        loss = F.cross_entropy(pred, target)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        if idx % 100 == 0:
            print("Train Epoch: {}, iterantion: {}, Loss: {}".format(epoch, idx, loss.item()))
    torch.save(model.state_dict(), 'badnets.pth')


# 测试过程
def test(model, device, test_loader):
    model.load_state_dict(torch.load('badnets.pth'))
    model.eval()
    total_loss = 0
    correct = 0
    with torch.no_grad():
        for idx, (data, target) in enumerate(test_loader):
            data, target = data.to(device), target.to(device)
            output = model(data)
            total_loss += F.cross_entropy(output, target, reduction="sum").item()
            pred = output.argmax(dim=1)
            correct += pred.eq(target.view_as(pred)).sum().item()
        total_loss /= len(test_loader.dataset)
        acc = correct / len(test_loader.dataset) * 100
        print("Test Loss: {}, Accuracy: {}".format(total_loss, acc))
def main():
    # 超参数
    num_epochs = 10
    lr = 0.01
    momentum = 0.5
    model = LeNet_5().to(device)
    optimizer = torch.optim.SGD(model.parameters(),
                                lr=lr,
                                momentum=momentum)
    # 在干净训练集上训练,在干净测试集上测试
    # acc=98.29%
    # 在带后门数据训练集上训练,在干净测试集上测试
    # acc=98.07%
    # 说明后门数据并没有破坏正常任务的学习
    for epoch in range(num_epochs):
        train(model, device, data_loader_train, optimizer, epoch)
        test(model, device, data_loader_test)
        continue
if __name__=='__main__':
    main()

测试攻击成功率

# 攻击成功率 99.66%  对测试集中所有图像都注入后门
    for i in range(len(test_loader)):
        test_loader.data[i][26][26] = 255
        test_loader.data[i][25][25] = 255
        test_loader.data[i][24][26] = 255
        test_loader.data[i][26][24] = 255
        test_loader.targets[i] = 9
    data_loader_test2 = torch.utils.data.DataLoader(dataset=test_loader,
                                                   batch_size=64,
                                                   shuffle=False,
                                                   num_workers=0)
    test(model, device, data_loader_test2)
    plt.imshow(test_loader.data[0].numpy())
    plt.show()

可视化中毒样本,成功被预测为特定目标类别“9”,证明攻击成功。
在这里插入图片描述
在这里插入图片描述

完整代码

from packaging import packaging
from torchvision.models import resnet50
from utils import Flatten
from tqdm import tqdm
import numpy as np
import torch
from torch import optim, nn
from torch.utils.data import DataLoader
import torch.nn.functional as F
import matplotlib.pyplot as plt
from torchvision import datasets, transforms
use_cuda = True
device = torch.device("cuda" if (use_cuda and torch.cuda.is_available()) else "cpu")

# 载入MNIST训练集和测试集
transform = transforms.Compose([
            transforms.ToTensor(),
            ])
train_loader = datasets.MNIST(root='data',
                              transform=transform,
                              train=True,
                              download=True)
test_loader = datasets.MNIST(root='data',
                             transform=transform,
                             train=False)
# 可视化样本 大小28×28
# plt.imshow(train_loader.data[0].numpy())
# plt.show()

# 训练集样本数据
print(len(train_loader))

# 在训练集中植入5000个中毒样本
''' '''
for i in range(5000):
    train_loader.data[i][26][26] = 255
    train_loader.data[i][25][25] = 255
    train_loader.data[i][24][26] = 255
    train_loader.data[i][26][24] = 255
    train_loader.targets[i] = 9  # 设置中毒样本的目标标签为9
# 可视化中毒样本
plt.imshow(train_loader.data[0].numpy())
plt.show()


data_loader_train = torch.utils.data.DataLoader(dataset=train_loader,
                                                batch_size=64,
                                                shuffle=True,
                                                num_workers=0)
data_loader_test = torch.utils.data.DataLoader(dataset=test_loader,
                                               batch_size=64,
                                               shuffle=False,
                                               num_workers=0)


# LeNet-5 模型
class LeNet_5(nn.Module):
    def __init__(self):
        super(LeNet_5, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, 5, 1)
        self.conv2 = nn.Conv2d(6, 16, 5, 1)
        self.fc1 = nn.Linear(16 * 4 * 4, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = F.max_pool2d(self.conv1(x), 2, 2)
        x = F.max_pool2d(self.conv2(x), 2, 2)
        x = x.view(-1, 16 * 4 * 4)
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.fc3(x)
        return x


# 训练过程
def train(model, device, train_loader, optimizer, epoch):
    model.train()
    for idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        pred = model(data)
        loss = F.cross_entropy(pred, target)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        if idx % 100 == 0:
            print("Train Epoch: {}, iterantion: {}, Loss: {}".format(epoch, idx, loss.item()))
    torch.save(model.state_dict(), 'badnets.pth')


# 测试过程
def test(model, device, test_loader):
    model.load_state_dict(torch.load('badnets.pth'))
    model.eval()
    total_loss = 0
    correct = 0
    with torch.no_grad():
        for idx, (data, target) in enumerate(test_loader):
            data, target = data.to(device), target.to(device)
            output = model(data)
            total_loss += F.cross_entropy(output, target, reduction="sum").item()
            pred = output.argmax(dim=1)
            correct += pred.eq(target.view_as(pred)).sum().item()
        total_loss /= len(test_loader.dataset)
        acc = correct / len(test_loader.dataset) * 100
        print("Test Loss: {}, Accuracy: {}".format(total_loss, acc))


def main():
    # 超参数
    num_epochs = 10
    lr = 0.01
    momentum = 0.5
    model = LeNet_5().to(device)
    optimizer = torch.optim.SGD(model.parameters(),
                                lr=lr,
                                momentum=momentum)
    # 在干净训练集上训练,在干净测试集上测试
    # acc=98.29%
    # 在带后门数据训练集上训练,在干净测试集上测试
    # acc=98.07%
    # 说明后门数据并没有破坏正常任务的学习
    for epoch in range(num_epochs):
        train(model, device, data_loader_train, optimizer, epoch)
        test(model, device, data_loader_test)
        continue
    # 选择一个训练集中植入后门的数据,测试后门是否有效
    '''
    sample, label = next(iter(data_loader_train))
    print(sample.size())  # [64, 1, 28, 28]
    print(label[0])
    # 可视化
    plt.imshow(sample[0][0])
    plt.show()
    model.load_state_dict(torch.load('badnets.pth'))
    model.eval()
    sample = sample.to(device)
    output = model(sample)
    print(output[0])
    pred = output.argmax(dim=1)
    print(pred[0])
    '''
    # 攻击成功率 99.66%
    for i in range(len(test_loader)):
        test_loader.data[i][26][26] = 255
        test_loader.data[i][25][25] = 255
        test_loader.data[i][24][26] = 255
        test_loader.data[i][26][24] = 255
        test_loader.targets[i] = 9
    data_loader_test2 = torch.utils.data.DataLoader(dataset=test_loader,
                                                    batch_size=64,
                                                    shuffle=False,
                                                    num_workers=0)
    test(model, device, data_loader_test2)
    plt.imshow(test_loader.data[0].numpy())
    plt.show()


if __name__=='__main__':
    main()

http://lihuaxi.xjx100.cn/news/1717111.html

相关文章

Elasticsearch:使用 Open AI 和 Langchain 的 RAG - Retrieval Augmented Generation (三)

这是继之前文章: Elasticsearch:使用 Open AI 和 Langchain 的 RAG - Retrieval Augmented Generation (一) Elasticsearch:使用 Open AI 和 Langchain 的 RAG - Retrieval Augmented Generation (二&…

Docker 启动远程服务访问不了

今天一下午在弄这个 1、防火墙是否关了 firewall-cmd --state2、ip转发开没开 sysctl net.ipv4.ip_forward3、service iptables是不是打开并拦截了 4、检查docker启动的端口号是否一致,或者启动时对不对 5、检查docker的服务是否起来了,比如你的端口号…

【Java】泛型擦除机制

擦除机制:将一个List集合 泛型 赋值给一个没有使用到泛型List集合,直接去除泛型 示例代码 package com.collection.Demo06;import java.util.ArrayList; import java.util.List;/*** 泛型擦除机制*/ public class Test06 {public static void main(Stri…

怎么禁止U盘拷贝电脑资料

怎么禁止U盘拷贝电脑资料 现如今U盘已经成为了人们日常传输文件的主要方式之一,U盘在给我们提供便利的同时,也带来了一些安全隐患,比如U盘可以轻松地复制电脑文件,这可能会导致机密信息泄露。因此,本文将介绍一些方法…

Sharding-JDBC-5.0.0 实现按月分表、自动建表、自动刷新节点

1、引入Maven 依赖 <dependency><groupId>org.apache.shardingsphere</groupId><artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId><version>5.0.0</version></dependency><dependency><groupI…

SpringBoot 全局请求拦截

方法一 在Spring Boot中&#xff0c;可以使用拦截器&#xff08;Interceptor&#xff09;来实现全局请求拦截。示例&#xff1a; 首先&#xff0c;创建一个拦截器类&#xff0c;实现HandlerInterceptor接口&#xff1a; import javax.servlet.http.HttpServletRequest; impo…

Java并发线程池原理源码深入分析与调优实战

一&#xff0c;开篇&#xff1a; java中提供了多线程设计的Api&#xff0c;为什么还要用线程池呢&#xff1f; 下来看两个例子&#xff1a; 1. 使用多线程跑十万次 2. 使用线程池跑十万次 使用多线程跑十万次 package com.laoyang.ThreadPool.公开课;import java.util.ArrayL…

2023-10学习笔记

1.sql注入 不管是上一篇博客&#xff0c;通过java代码执行sql 还是我们常用的Mybatis的#{}和${} 都会提到sql注入的问题 1.1啥是sql注入 应该知道是说传入无关的参数&#xff0c;比如本来是想要一个where条件查询参数 但是你拼了一个drop 比如 原来的sql select * from…