最近在研究GNN,看了些许GNN的东西,心想着光看不练门外汉啊!这可不行,于是我开始自己动手实现一个GCN识别,一想到整一个模型demo,那必少不了MINIST数据集,反正就移花接木大法(MINIST可能会想,我这么忙,真的屑屑你!)一开始想着自己整一个,但是还没开始我就陷入了沉思,MINIST一个图片数据怎么去变成图边数据,于是万能的百度指导我进入了人均星星星的知乎,在那里,我发现有个日本小帅哥(后文我都讲日帅)已经做过了,那不说了,作为能cv绝不手写的浑水摸鱼星人,开始了偶尔心血来潮的扒代码历程。
原代码传送门
怎么把图片数据变成节点和边?我想了很久,看了日帅的代码(为啥是看代码,因为日文我看不懂(o^^o))我豁然开朗,其把每张图片每个像素点想成Node,其邻近关系考虑为边,具体思路如下(以3*3的数据举例):
第一步:阈值过滤(为啥这样叫,因为我喜欢)
通过设定一个阈值k,源代码为102,我们这里设置为2(不知道为啥选这个,有知道的嘛?),将小于k的变成-1,反之为1000;
第二步:padding(为啥这样叫,大家都这么叫.~.)
源代码将padding_width定为2,其实我想了一下1是不是也可以;
第三步:得到Nodes和Edge的信息
将array中的非-1标记为节点k,k=0,1,2,3…
最后保存每个节点的坐标作为Node feature,以及边信息,例如3节点坐标为(1,1)边为[(3,1),(3,2),(3,4),(3,5),(3,6)],至此图片数据变成图数据(日帅给我的启发很大的,回头我继续思考一下)。代码添加注释如下:
import gzip
import numpy as np
data = 0
# 读取gzip图片数据,转换图片格式
with gzip.open('data/train-images-idx3-ubyte.gz', 'rb') as f:
data = np.frombuffer(f.read(), np.uint8, offset=16)
data = data.reshape([-1, 28, 28])
# 把28*28的数据中<102变成1,大于变成1000,为啥取102我也不知道,嘻嘻嘻。
data = np.where(data < 102, -1, 1000)
for e,imgtmp in enumerate(data):
# 数组padding,其实我在考虑做padding为1是不是也行
img = np.pad(imgtmp, [(2, 2), (2, 2)], "constant", constant_values=(-1))
# node标记 0,1,2,3,4.......
cnt = 0
for i in range(2, 30):
for j in range(2, 30):
if img[i][j] == 1000:
img[i][j] = cnt
cnt += 1
# 记录边和节点信息
edges = []
nodes = np.zeros((cnt, 2))
for i in range(2, 30):
for j in range(2, 30):
if img[i][j] == -1:
continue
filter = img[i - 2:i + 3, j - 2:j + 3].flatten()
# Node的八个方位
filter1 = filter[[6, 7, 8, 11, 13, 16, 17, 18]]
# 记录节点的坐标
nodes[filter[12]][0] = i - 2
nodes[filter[12]][1] = j - 2
# 记录边
for tmp in filter1:
if not tmp == -1:
edges.append([filter[12], tmp])
# 保存节点数据和边数据
np.save("data/graphs/" + str(e), edges)
np.save("data/node_features/" + str(e),nodes)
2. 模型训练
这部分的话,就不过细讲了(主要是我也过细讲不了),大致分为三个部分:
第一部分:加载数据
加载labels和处理好的Nodes以及edge数据,也就是说自建数据集,代码如下:
def load_mnist_graph(data_size=60000):
# 获取数据主函数
data_list = []
labels = 0
with gzip.open('data/train-labels-idx1-ubyte.gz', 'rb') as f:
labels = np.frombuffer(f.read(), np.uint8, offset=8)
for i in range(data_size):
edge = torch.tensor(np.load('data/graphs/' + str(i) + '.npy').T, dtype=torch.long)
x = torch.tensor(np.load('data/node_features/' + str(i) + '.npy') / 28, dtype=torch.float)
# 构建数据集
d = Data(x=x, edge_index=edge.contiguous(), t=int(labels[i]))
data_list.append(d)
if i % 1000 == 999:
print("\rData loaded " + str(i + 1), end=" ")
print("Complete!")
return data_list
第二部分:定义网络
按照自己的喜好定义就行,毕竟我电脑cpu那点算力,还不支持我随心所欲的训练,M1早点出GPU版吧,孩子顶不住了(>﹏<),这里就按照日帅的来吧!
# 定义网络结构
class Net(torch.nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = GCNConv(2, 16)
self.conv2 = GCNConv(16, 32)
self.conv3 = GCNConv(32, 48)
self.conv4 = GCNConv(48, 64)
self.conv5 = GCNConv(64, 96)
self.conv6 = GCNConv(96, 128)
self.linear1 = torch.nn.Linear(128,64)
self.linear2 = torch.nn.Linear(64,10)
def forward(self, data):
x, edge_index = data.x, data.edge_index
x = self.conv1(x, edge_index)
x = F.relu(x)
x = self.conv2(x, edge_index)
x = F.relu(x)
x = self.conv3(x, edge_index)
x = F.relu(x)
x = self.conv4(x, edge_index)
x = F.relu(x)
x = self.conv5(x, edge_index)
x = F.relu(x)
x = self.conv6(x, edge_index)
x = F.relu(x)
x, _ = scatter_max(x, data.batch, dim=0)
x = self.linear1(x)
x = F.relu(x)
x = self.linear2(x)
return x
第三部分:训练主函数
训练部分的参数,可以按照自己电脑的算力以及结果定,这里不做过多修改,cv浑水摸鱼星人只是觉得日帅写的很棒(o^^o)!
def main():
# 训练主程序
data_size = 60000
train_size = 50000
batch_size = 100
epoch_num = 150
# 数据获取
mnist_list = load_mnist_graph(data_size=data_size)
device = torch.device('cpu')
model = Net().to(device)
trainset = mnist_list[:train_size]
optimizer = torch.optim.Adam(model.parameters())
trainloader = DataLoader(trainset, batch_size=batch_size, shuffle=True)
testset = mnist_list[train_size:]
testloader = DataLoader(testset, batch_size=batch_size)
criterion = nn.CrossEntropyLoss()
history = {
"train_loss": [],
"test_loss": [],
"test_acc": []
}
print("Start Train")
# 训练部分
model.train()
for epoch in range(epoch_num):
train_loss = 0.0
for i, batch in enumerate(trainloader):
batch = batch.to("cpu")
optimizer.zero_grad()
outputs = model(batch)
loss = criterion(outputs, batch.t)
loss.backward()
optimizer.step()
train_loss += loss.cpu().item()
if i % 10 == 9:
progress_bar = '[' + ('=' * ((i + 1) // 10)) + (' ' * ((train_size // 100 - (i + 1)) // 10)) + ']'
print('\repoch: {:d} loss: {:.3f} {}'
.format(epoch + 1, loss.cpu().item(), progress_bar), end=" ")
print('\repoch: {:d} loss: {:.3f}'
.format(epoch + 1, train_loss / (train_size / batch_size)), end=" ")
history["train_loss"].append(train_loss / (train_size / batch_size))
correct = 0
total = 0
batch_num = 0
loss = 0
with torch.no_grad():
for data in testloader:
data = data.to(device)
outputs = model(data)
loss += criterion(outputs, data.t)
_, predicted = torch.max(outputs, 1)
total += data.t.size(0)
batch_num += 1
correct += (predicted == data.t).sum().cpu().item()
history["test_acc"].append(correct / total)
history["test_loss"].append(loss.cpu().item() / batch_num)
endstr = ' ' * max(1, (train_size // 1000 - 39)) + "\n"
print('Test Accuracy: {:.2f} %%'.format(100 * float(correct / total)), end=' ')
print(f'Test Loss: {loss.cpu().item() / batch_num:.3f}', end=endstr)
print('Finished Training')
# 最终结果
correct = 0
total = 0
with torch.no_grad():
for data in testloader:
data = data.to(device)
outputs = model(data)
_, predicted = torch.max(outputs, 1)
total += data.t.size(0)
correct += (predicted == data.t).sum().cpu().item()
print('Accuracy: {:.2f} %%'.format(100 * float(correct / total)))
完整代码
完整代码,我做了些许简化和修改,提醒一下这里的代码我改成了cpu,有条件的大帅哥可以自行改为cuda,如下:
import numpy as np
import gzip
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.data import Data
from torch_geometric.loader import DataLoader
from torch_geometric.nn import GCNConv
from torch_scatter import scatter_max
def load_mnist_graph(data_size=60000):
# 获取数据主函数
data_list = []
labels = 0
with gzip.open('data/train-labels-idx1-ubyte.gz', 'rb') as f:
labels = np.frombuffer(f.read(), np.uint8, offset=8)
for i in range(data_size):
edge = torch.tensor(np.load('data/graphs/' + str(i) + '.npy').T, dtype=torch.long)
x = torch.tensor(np.load('data/node_features/' + str(i) + '.npy') / 28, dtype=torch.float)
# 构建数据集
d = Data(x=x, edge_index=edge.contiguous(), t=int(labels[i]))
data_list.append(d)
if i % 1000 == 999:
print("\rData loaded " + str(i + 1), end=" ")
print("Complete!")
return data_list
# 定义网络结构
class Net(torch.nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = GCNConv(2, 16)
self.conv2 = GCNConv(16, 32)
self.conv3 = GCNConv(32, 48)
self.conv4 = GCNConv(48, 64)
self.conv5 = GCNConv(64, 96)
self.conv6 = GCNConv(96, 128)
self.linear1 = torch.nn.Linear(128,64)
self.linear2 = torch.nn.Linear(64,10)
def forward(self, data):
x, edge_index = data.x, data.edge_index
x = self.conv1(x, edge_index)
x = F.relu(x)
x = self.conv2(x, edge_index)
x = F.relu(x)
x = self.conv3(x, edge_index)
x = F.relu(x)
x = self.conv4(x, edge_index)
x = F.relu(x)
x = self.conv5(x, edge_index)
x = F.relu(x)
x = self.conv6(x, edge_index)
x = F.relu(x)
x, _ = scatter_max(x, data.batch, dim=0)
x = self.linear1(x)
x = F.relu(x)
x = self.linear2(x)
return x
def main():
# 训练主程序
data_size = 60000
train_size = 50000
batch_size = 100
epoch_num = 150
# 数据获取
mnist_list = load_mnist_graph(data_size=data_size)
device = torch.device('cpu')
model = Net().to(device)
trainset = mnist_list[:train_size]
optimizer = torch.optim.Adam(model.parameters())
trainloader = DataLoader(trainset, batch_size=batch_size, shuffle=True)
testset = mnist_list[train_size:]
testloader = DataLoader(testset, batch_size=batch_size)
criterion = nn.CrossEntropyLoss()
history = {
"train_loss": [],
"test_loss": [],
"test_acc": []
}
print("Start Train")
# 训练部分
model.train()
for epoch in range(epoch_num):
train_loss = 0.0
for i, batch in enumerate(trainloader):
batch = batch.to("cpu")
optimizer.zero_grad()
outputs = model(batch)
loss = criterion(outputs, batch.t)
loss.backward()
optimizer.step()
train_loss += loss.cpu().item()
if i % 10 == 9:
progress_bar = '[' + ('=' * ((i + 1) // 10)) + (' ' * ((train_size // 100 - (i + 1)) // 10)) + ']'
print('\repoch: {:d} loss: {:.3f} {}'
.format(epoch + 1, loss.cpu().item(), progress_bar), end=" ")
print('\repoch: {:d} loss: {:.3f}'
.format(epoch + 1, train_loss / (train_size / batch_size)), end=" ")
history["train_loss"].append(train_loss / (train_size / batch_size))
correct = 0
total = 0
batch_num = 0
loss = 0
with torch.no_grad():
for data in testloader:
data = data.to(device)
outputs = model(data)
loss += criterion(outputs, data.t)
_, predicted = torch.max(outputs, 1)
total += data.t.size(0)
batch_num += 1
correct += (predicted == data.t).sum().cpu().item()
history["test_acc"].append(correct / total)
history["test_loss"].append(loss.cpu().item() / batch_num)
endstr = ' ' * max(1, (train_size // 1000 - 39)) + "\n"
print('Test Accuracy: {:.2f} %%'.format(100 * float(correct / total)), end=' ')
print(f'Test Loss: {loss.cpu().item() / batch_num:.3f}', end=endstr)
print('Finished Training')
# 最终结果
correct = 0
total = 0
with torch.no_grad():
for data in testloader:
data = data.to(device)
outputs = model(data)
_, predicted = torch.max(outputs, 1)
total += data.t.size(0)
correct += (predicted == data.t).sum().cpu().item()
print('Accuracy: {:.2f} %%'.format(100 * float(correct / total)))
if __name__ == '__main__':
main()
结语
今天是521,我和日帅的约会让我在对图数据上的收获收益匪浅,也达到了自己动手完成一个小demo的目标,其实说实话,日系帅哥的颜值我还是很吃的,还能写代码的就更爱了!又是一个cv浑水摸鱼的一天万岁!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)