test_1_qxy
In [1]:
import pandas as pd
import os
In [2]:
#读取csv文件
#TRAIN = pd.read_csv('./train_label.csv')
train_logo_path=r'C:/Users/test/Desktop/logo识别练习赛/data/train/logo'
train_without_logo_path=r'C:/Users/test/Desktop/logo识别练习赛/data/train/without_logo'
train_logo_img = os.listdir(train_logo_path) #解析出父文件夹中所有的文件名称,并以列表的格式输出,
l1=len(train_logo_img)
print(l1)
test = pd.read_csv(r'C:/Users/test/Desktop/logo识别练习赛/data/提交示例文件.csv',header=None,encoding='gbk')
train_without_logo_img= os.listdir(train_without_logo_path) #解析出父文件夹中所有的文件名称,并以列表的格式输出,
l2=len(train_without_logo_img)
print(l2)
l=l1+l2
print(f"共有 {l} 张训练样本")
print(f"共有 {len(test)} 张测试样本")
test.head()
Out[2]:
In [3]:
!pip install cnn_finetune
In [4]:
import os
import shutil
import glob
from PIL import Image
import pandas as pd
In [5]:
#目标文件夹
determination = r'D:/logo_data/train_new2'
if not os.path.exists(determination):
os.makedirs(determination)
dirs_without_logo=os.listdir(train_without_logo_path) #列表 得到图片名
print(dirs_without_logo)
dirs_logo=os.listdir(train_logo_path) #列表 得到图片名
print('dirs_logo:')
print(dirs_logo)
data_list1=[]
for i in range (1,l+1):
data_list1.append(str(i).zfill(3)+'.jpg')
print('data_list1:')
print(data_list1)
data_list2=[]
count1=0
count2=0
for i in range (1,l1+1):
data_list2.append(1)
count1+=1
for j in range (1,l2+1):
data_list2.append(0)
count2+=1
print('data_list2:')
print(data_list2)
print('len(data_list2):')
print(len(data_list2))
print(count1)
print(count2)
In [6]:
data = {'img_id': pd.Series([ str(i).zfill(3) for i in data_list1]),
'label': pd.Series([ str(j).zfill(3) for j in data_list2])}
train_label=pd.DataFrame(data)
print(train_label)
train_label.head(10)
Out[6]:
In [7]:
train_label.tail(10)
Out[7]:
In [8]:
#train_lable已准备好,是一个dataframe文件,保存时不要索引
train_label.to_csv(r"C:/Users/test/Desktop/logo识别练习赛/data/train_label.csv", index=False)
In [9]:
#determination = r'D:/logo_data/train'
#train_logo_path=r'C:/Users/test/Desktop/logo识别练习赛/data/train/logo'
#train_without_logo_path=r'C:/Users/test/Desktop/logo识别练习赛/data/train/without_logo'
In [10]:
imgpath = train_logo_path
new_imgpath = determination
j =1 #文件名起始
for root, dirs, files in os.walk(imgpath):
for i in range(len(files)):
shutil.copy(os.path.join(imgpath,files[i]), os.path.join(new_imgpath, str(j).zfill(3) + '.jpg'))
j += 1
print ('this work has done')
In [11]:
j =270 #文件名起始
for root, dirs, files in os.walk(train_without_logo_path):
for i in range(len(files)):
shutil.copy(os.path.join(train_without_logo_path,files[i]), os.path.join(determination, str(j) + '.jpg'))
j += 1
print ('this work has done')
In [12]:
#图片文件夹和标注已准备好,待打乱
In [13]:
import pandas as pd
train_label=pd.read_csv(r"C:/Users/test/Desktop/logo识别练习赛/data/train_label.csv")
len(train_label[train_label['label']==1]),len(train_label[train_label['label']==0]) #可见正负样本并不均衡
Out[13]:
In [14]:
class Config(object):
backbone = 'xception'#
num_classes = 2 #
use_smooth_label=False
loss = 'CrossEntropyLoss'#focal_loss/CrossEntropyLoss
input_size = 384
train_batch_size = 16 # batch size
val_batch_size = 12
test_batch_size = 1
optimizer = 'adam'#sam/adam
lr_scheduler='exp'#cosine/exp/poly
lr = 3e-4 # adam 0.00001
sam_lr=1e-3
MOMENTUM = 0.9
device = "cuda" # cuda or cpu
gpu_id = [0]
num_workers = 0 # how many workers for loading data
max_epoch = 21
weight_decay = 5e-4
val_interval = 1
print_interval = 50
save_interval = 2
tensorboard_interval=50
min_save_epoch=1
load_from = None
#
log_dir = 'log/'
train_val_data = r"D:/logo_data/train_new"
train_label_csv = r"C:/Users/test/Desktop/logo识别练习赛/data/train_label.csv"
#
checkpoints_dir = './ckpt/'
pre_trained = '..'
In [15]:
import os
import glob
from PIL import Image
import torch
from torch.utils.data import Dataset,DataLoader
import numpy as np
from torchvision import transforms as T
import torchvision
import cv2
from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True
In [16]:
class fuDataset(Dataset):
def __init__(self, root, train_label_csv, phase='train', input_size=224):
self.phase = phase
train_val_label=pd.read_csv(r"C:/Users/test/Desktop/logo识别练习赛/data/train_label.csv")
val_ids=[i for i in range(len(train_label)) if i%5==0]#验证集
train_ids=[i for i in range(len(train_label)) if i%5!=0]#训练集
if phase=='train':
img_label=train_val_label[train_val_label.index.isin(train_ids)].reset_index()
self.img_names=[os.path.join(root,i) for i in img_label['img_id'].values]
self.labels=img_label['label'].values
else:
img_label=train_val_label[train_val_label.index.isin(val_ids)].reset_index()
self.img_names=[os.path.join(root,i) for i in img_label['img_id'].values]
self.labels=img_label['label'].values
#使用全部数据训练(不要验证集)
self.img_names=[os.path.join(root,i) for i in train_val_label['img_id'].values]
self.labels=train_val_label['label'].values
#
normalize = T.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
if self.phase == 'train':
self.transforms = T.Compose([
T.Resize((input_size,input_size)),
T.RandomHorizontalFlip(p=0.5),
T.RandomVerticalFlip(p=0.25),
T.RandomRotation(degrees=(-20,20)),
T.ColorJitter(0.2,0.2),
T.ToTensor(),
normalize
])
else:
self.transforms = T.Compose([
T.Resize((input_size,input_size)),
T.ToTensor(),
normalize
])
def __getitem__(self, index):
img_path = self.img_names[index]
data = Image.open(img_path)
data = data.convert('RGB')
data = self.transforms(data)
label = np.int32(self.labels[index])
return data.float(), label
def __len__(self):
return len(self.img_names)
In [17]:
import logging
def get_logger(filename, verbosity=1, name=None):
level_dict = {0: logging.DEBUG, 1: logging.INFO, 2: logging.WARNING}
formatter = logging.Formatter(
"[%(asctime)s][%(filename)s][%(levelname)s] %(message)s"
)
logger = logging.getLogger(name)
logger.setLevel(level_dict[verbosity])
fh = logging.FileHandler(filename, "w")
fh.setFormatter(formatter)
logger.addHandler(fh)
sh = logging.StreamHandler()
sh.setFormatter(formatter)
logger.addHandler(sh)
return logger
In [18]:
import os
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import torch.optim as optim
import time
from sklearn.metrics import accuracy_score
from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True
import matplotlib.pyplot as plt
from tensorboardX import SummaryWriter
import numpy as np
from cnn_finetune import make_model
In [19]:
import warnings
warnings.filterwarnings("ignore")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def train_model(model,criterion, optimizer, lr_scheduler=None):
train_dataset = fuDataset(opt.train_val_data, opt.train_label_csv, phase='train', input_size=opt.input_size)
trainloader = DataLoader(train_dataset,
batch_size=opt.train_batch_size,
shuffle=True,
num_workers=opt.num_workers)
total_iters=len(trainloader)
logger.info('total_iters:{}'.format(total_iters))
model_name=opt.backbone
since = time.time()
best_score = 0.0
best_epoch = 0
log_acc=0
log_train=0
writer = SummaryWriter() # 用于记录训练和测试的信息:loss,acc等
logger.info('start training...')
#
iters = len(trainloader)
for epoch in range(1,opt.max_epoch+1):
model.train(True)
begin_time=time.time()
logger.info('learning rate:{}'.format(optimizer.param_groups[-1]['lr']))
logger.info('Epoch {}/{}'.format(epoch, opt.max_epoch))
logger.info('-' * 10)
running_corrects_linear = 0
count=0
train_loss = []
for i, data in enumerate(trainloader):
count+=1
inputs, labels = data
labels = labels.type(torch.LongTensor)
inputs, labels = inputs.to(device), labels.to(device)
#
out_linear= model(inputs)
_, linear_preds = torch.max(out_linear.data, 1)
loss = criterion(out_linear, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 更新cosine学习率
lr_scheduler.step(epoch + count / iters)
if i % opt.print_interval == 0 or out_linear.size()[0] < opt.train_batch_size:
spend_time = time.time() - begin_time
logger.info(
' Epoch:{}({}/{}) loss:{:.3f} lr:{:.7f} epoch_Time:{}min:'.format(
epoch, count, total_iters,
loss.item(), optimizer.param_groups[-1]['lr'],
spend_time / count * total_iters // 60 - spend_time // 60))
#
running_corrects_linear += torch.sum(linear_preds == labels.data)
train_loss.append(loss.item())
writer.add_scalar('train_loss',loss.item(), global_step=log_train)
log_train+=1
#
#lr_scheduler.step()
val_acc,val_loss= val_model(model, criterion)
epoch_acc_linear = running_corrects_linear.double() / total_iters / opt.train_batch_size
logger.info('valLoss: {:.4f} valAcc: {:.4f}'.format(val_loss,val_acc))
logger.info('Epoch:[{}/{}] train_acc={:.3f} '.format(epoch, opt.max_epoch,
epoch_acc_linear))
#
model_out_path = model_save_dir + "/" + '{}_'.format(model_name) + str(epoch) + '.pth'
best_model_out_path = model_save_dir + "/" + '{}_'.format(model_name) + 'best' + '.pth'
#model_out_path = '{}_'.format(model_name) + str(epoch) + '.pth'
#save the best model
if val_acc > best_score:
best_score = val_acc
best_epoch=epoch
torch.save(model.state_dict(), best_model_out_path)
logger.info("save best epoch: {} best acc: {}".format(best_epoch,val_acc))
#save based on epoch interval
if epoch % opt.save_interval == 0 and epoch>opt.min_save_epoch:
torch.save(model.state_dict(), model_out_path)
#
logger.info('Best acc: {:.3f} Best epoch:{}'.format(best_score,best_epoch))
time_elapsed = time.time() - since
logger.info('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
writer.close()
@torch.no_grad()
def val_model(model, criterion):
val_dataset = fuDataset(opt.train_val_data, opt.train_label_csv, phase='val', input_size=opt.input_size)
val_loader = DataLoader(val_dataset,
batch_size=opt.val_batch_size,
shuffle=False,
num_workers=opt.num_workers)
dset_sizes=len(val_dataset)
model.eval()
running_loss = 0.0
running_corrects = 0
cont = 0
outPre = []
outLabel = []
pres_list=[]
labels_list=[]
for data in val_loader:
inputs, labels = data
labels = labels.type(torch.LongTensor)
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
_, preds = torch.max(outputs.data, 1)
loss = criterion(outputs, labels)
if cont == 0:
outPre = outputs.data.cpu()
outLabel = labels.data.cpu()
else:
outPre = torch.cat((outPre, outputs.data.cpu()), 0)
outLabel = torch.cat((outLabel, labels.data.cpu()), 0)
pres_list+=preds.cpu().numpy().tolist()
labels_list+=labels.data.cpu().numpy().tolist()
running_loss += loss.item() * inputs.size(0)
running_corrects += torch.sum(preds == labels.data)
cont += 1
#
val_acc = accuracy_score(labels_list, pres_list)
return val_acc,running_loss / dset_sizes
#
if __name__ == "__main__":
#
opt = Config()
torch.cuda.empty_cache()
#device = torch.device(opt.device)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
criterion = torch.nn.CrossEntropyLoss().to(device)
model_name=opt.backbone
model_save_dir =os.path.join(opt.checkpoints_dir , model_name)
if not os.path.exists(model_save_dir): os.makedirs(model_save_dir)
logger = get_logger(os.path.join(model_save_dir,'log.log'))
logger.info('Using: {}'.format(model_name))
logger.info('InputSize: {}'.format(opt.input_size))
logger.info('optimizer: {}'.format(opt.optimizer))
logger.info('lr_init: {}'.format(opt.lr))
logger.info('batch size: {}'.format(opt.train_batch_size))
logger.info('criterion: {}'.format(opt.loss))
logger.info('Using label smooth: {}'.format(opt.use_smooth_label))
logger.info('lr_scheduler: {}'.format(opt.lr_scheduler))
logger.info('Using the GPU: {}'.format(str(opt.gpu_id)))
model = make_model('{}'.format('xception'), num_classes=2,
pretrained=True)
model.to(device)
optimizer = optim.AdamW(model.parameters(), lr=3e-4 ,weight_decay=5e-4)
#lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.5)
lr_scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0=3, T_mult=2, eta_min=1e-6, last_epoch=-1)
train_model(model, criterion, optimizer,
lr_scheduler=lr_scheduler)
#
In [20]:
submit=pd.read_csv(r'C:/Users/test/Desktop/logo识别练习赛/data/提交示例文件.csv',header=None)
submit.columns=['name']
model = make_model('{}'.format('xception'), num_classes=2,
pretrained=False)
net_weight='./ckpt/xception/xception_20.pth'
model.load_state_dict(torch.load(net_weight))
model = model.to(device)
model.eval()
#
infer_transforms=T.Compose([
T.Resize((opt.input_size,opt.input_size)),
T.ToTensor(),
T.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
])
result=[]
test_dir='C:/Users/test/Desktop/logo识别练习赛/data/test/'
for name in submit['name'].values:
img_path=os.path.join(test_dir,name)
data = Image.open(img_path)
data = data.convert('RGB')
data = infer_transforms(data)
data=data.unsqueeze(0)
inputs= data.to(device)
with torch.no_grad():
outputs = model(inputs)
_, preds = torch.max(outputs.data, 1)
result.append(preds.cpu().data.numpy()[0])
#
submit['label']=result
submit.to_csv(r'C:/Users/test/Desktop/submit2.csv',index=False,header=None)
In [ ]:
In [ ]:
In [ ]:
Content
Comments
You must login before you can post a comment.