安装

YOLOv8是YOLO系列模型的较新版本。和其它版本不同的是,该仓库并非起名为YOLOv8,而是公司名ultralytics,因为他们想将此版本作为一个通用库,以方便调用和部署。

所以可以直接安装ultralytics的库,也可以选择下载源码:git clone https://github.com/ultralytics/ultralytics.git

数据预处理

由于光线的选择性衰减与水中粒子散射问题,水下图像会表现出颜色扭曲、细节模糊、对比度低的问题。

光线在水中的传播衰减与陆地的传播衰减是不同的,它是一种不均匀的且依赖于波长特性的衰减。红光是可见光中波长最长的,在传播时,红光最先消失,所以水下的照片都会呈现出蓝色或者绿色的色调;同时不同水质中的微小颗粒也会在图像中引入一系列噪声,水体中的粒子对于光有反射作用,当其反射的光到达相机时会对所成的像产生散射效果,散射使成像的细节变得模糊影响图像的质量。

对数据增强的步骤如下:

分别定义了如下函数:

channel_split:用于将给定图像的RGB通道分开,并生成三个新图像,每个图像只包含原图像的一个颜色通道。

compensate_RB:用于对图像进行红色和蓝色通道的补偿。函数通过均衡红色和蓝色通道与绿色通道的差异来达到色彩校正的目的。

gray_world:用于实现图像的灰度世界假设(Gray World Assumption)白平衡算法。该算法假设图像中的平均颜色应该是灰色,并根据这一假设对图像进行颜色校正。

sharpen:用于对图像进行锐化处理,基于高斯模糊和增强对比来提高图像的锐度。

hsv_global_equalization:用于对图像进行全局HSV直方图均衡化。

average_fusion:用于对两幅图像进行平均融合。通过对两幅图像的每个通道像素值取平均值来实现融合。

对图像数据的具体处理过程如下:遍历输入文件夹中的文件,检查文件扩展名是否为图像格式,调用多个图像处理函数对每一个输入图像进行处理,依次为红蓝通道补偿、灰度世界白平衡、图像锐化、全局HSV直方图均衡化和图像融合。最后将处理后的图像保存到指定的输出文件夹中。

import os
from PIL import Image, ImageOps, ImageFilter, ImageStat
import numpy as np


def channel_split(image):
# Split the R, G and B channels
imageR, imageG, imageB = image.split()
x, y = image.size
Rchannel = np.zeros((y, x, 3), dtype="uint8")
Bchannel = np.zeros((y, x, 3), dtype="uint8")
Gchannel = np.zeros((y, x, 3), dtype="uint8")

# Create individual components image
Rchannel[:, :, 0] = imageR;
Bchannel[:, :, 1] = imageG;
Gchannel[:, :, 2] = imageB;

# Convert array to image
Rchannel = Image.fromarray(Rchannel)
Bchannel = Image.fromarray(Bchannel)
Gchannel = Image.fromarray(Gchannel)


def compensate_RB(image, flag):
imager, imageg, imageb = image.split()
minR, maxR = imager.getextrema()
minG, maxG = imageg.getextrema()
minB, maxB = imageb.getextrema()

imageR = np.array(imager, np.float64)
imageG = np.array(imageg, np.float64)
imageB = np.array(imageb, np.float64)
x, y = image.size

for i in range(0, y):
for j in range(0, x):
imageR[i][j] = (imageR[i][j] - minR) / (maxR - minR)
imageG[i][j] = (imageG[i][j] - minG) / (maxG - minG)
imageB[i][j] = (imageB[i][j] - minB) / (maxB - minB)

meanR = np.mean(imageR)
meanG = np.mean(imageG)
meanB = np.mean(imageB)

if flag == 0:
for i in range(y):
for j in range(x):
imageR[i][j] = int((imageR[i][j] + (meanG - meanR) * (1 - imageR[i][j]) * imageG[i][j]) * maxR)
imageB[i][j] = int((imageB[i][j] + (meanG - meanB) * (1 - imageB[i][j]) * imageG[i][j]) * maxB)

for i in range(0, y):
for j in range(0, x):
imageG[i][j] = int(imageG[i][j] * maxG)

if flag == 1:
for i in range(y):
for j in range(x):
imageR[i][j] = int((imageR[i][j] + (meanG - meanR) * (1 - imageR[i][j]) * imageG[i][j]) * maxR)

for i in range(0, y):
for j in range(0, x):
imageB[i][j] = int(imageB[i][j] * maxB)
imageG[i][j] = int(imageG[i][j] * maxG)

compensateIm = np.zeros((y, x, 3), dtype="uint8")
compensateIm[:, :, 0] = imageR;
compensateIm[:, :, 1] = imageG;
compensateIm[:, :, 2] = imageB;

compensateIm = Image.fromarray(compensateIm)

return compensateIm


def gray_world(image):
imager, imageg, imageb = image.split()
imagegray = image.convert('L')
imageR = np.array(imager, np.float64)
imageG = np.array(imageg, np.float64)
imageB = np.array(imageb, np.float64)
imageGray = np.array(imagegray, np.float64)
x, y = image.size

meanR = np.mean(imageR)
meanG = np.mean(imageG)
meanB = np.mean(imageB)
meanGray = np.mean(imageGray)

for i in range(0, y):
for j in range(0, x):
imageR[i][j] = int(imageR[i][j] * meanGray / meanR)
imageG[i][j] = int(imageG[i][j] * meanGray / meanG)
imageB[i][j] = int(imageB[i][j] * meanGray / meanB)

whitebalancedIm = np.zeros((y, x, 3), dtype="uint8")
whitebalancedIm[:, :, 0] = imageR;
whitebalancedIm[:, :, 1] = imageG;
whitebalancedIm[:, :, 2] = imageB;

return Image.fromarray(whitebalancedIm)


def sharpen(wbimage, original):
smoothed_image = wbimage.filter(ImageFilter.GaussianBlur)
smoothedr, smoothedg, smoothedb = smoothed_image.split()
imager, imageg, imageb = wbimage.split()
imageR = np.array(imager, np.float64)
imageG = np.array(imageg, np.float64)
imageB = np.array(imageb, np.float64)
smoothedR = np.array(smoothedr, np.float64)
smoothedG = np.array(smoothedg, np.float64)
smoothedB = np.array(smoothedb, np.float64)
x, y = wbimage.size

for i in range(y):
for j in range(x):
imageR[i][j] = 2 * imageR[i][j] - smoothedR[i][j]
imageG[i][j] = 2 * imageG[i][j] - smoothedG[i][j]
imageB[i][j] = 2 * imageB[i][j] - smoothedB[i][j]

sharpenIm = np.zeros((y, x, 3), dtype="uint8")
sharpenIm[:, :, 0] = imageR;
sharpenIm[:, :, 1] = imageG;
sharpenIm[:, :, 2] = imageB;

return Image.fromarray(sharpenIm)


def hsv_global_equalization(image):
hsvimage = image.convert('HSV')
Hue, Saturation, Value = hsvimage.split()
equalizedValue = ImageOps.equalize(Value, mask=None)
x, y = image.size
equalizedIm = np.zeros((y, x, 3), dtype="uint8")
equalizedIm[:, :, 0] = Hue;
equalizedIm[:, :, 1] = Saturation;
equalizedIm[:, :, 2] = equalizedValue;
hsvimage = Image.fromarray(equalizedIm, 'HSV')
rgbimage = hsvimage.convert('RGB')

return rgbimage


def average_fusion(image1, image2):
image1r, image1g, image1b = image1.split()
image2r, image2g, image2b = image2.split()
image1R = np.array(image1r, np.float64)
image1G = np.array(image1g, np.float64)
image1B = np.array(image1b, np.float64)
image2R = np.array(image2r, np.float64)
image2G = np.array(image2g, np.float64)
image2B = np.array(image2b, np.float64)
x, y = image1R.shape

for i in range(x):
for j in range(y):
image1R[i][j] = int((image1R[i][j] + image2R[i][j]) / 2)
image1G[i][j] = int((image1G[i][j] + image2G[i][j]) / 2)
image1B[i][j] = int((image1B[i][j] + image2B[i][j]) / 2)

fusedIm = np.zeros((x, y, 3), dtype="uint8")
fusedIm[:, :, 0] = image1R;
fusedIm[:, :, 1] = image1G;
fusedIm[:, :, 2] = image1B;

return Image.fromarray(fusedIm)

def process_image(image_path, output_folder):
image = Image.open(image_path)
compensated_image = compensate_RB(image, flag=0)
gray_world_image = gray_world(image)
sharpened_image = sharpen(gray_world_image, image)
hsv_equalized_image = hsv_global_equalization(sharpened_image)
fused_image = average_fusion(compensated_image, hsv_equalized_image)

output_path = os.path.join(output_folder, os.path.basename(image_path))
fused_image.save(output_path)


def process_images_in_folder(input_folder, output_folder):
for filename in os.listdir(input_folder):
if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.tiff', '.bmp', '.gif')):
image_path = os.path.join(input_folder, filename)
process_image(image_path, output_folder)


# 示例用法
input_folder = '/Users/wuleihuan/Desktop/testimages' # 替换为实际的输入文件夹路径
output_folder = '/Users/wuleihuan/Desktop/testimages/augmented' # 替换为实际的输出文件夹路径

if not os.path.exists(output_folder):
os.makedirs(output_folder)

process_images_in_folder(input_folder, output_folder)

数据集在项目内存放路径如下:

ultralytics/
└── dataset/
├── images/
│ └── train/
│ └── val/
└── labels/
└── train/
└── val/

本文省略标签格式转换步骤。

训练集和验证集划分比例为0.8和0.2,划分数据集:

import os
import shutil
import random


def split_dataset(image_dir, label_dir, output_dir, train_ratio=0.8, val_ratio=0.2):
# 创建输出目录
for split in ['train', 'val']:
os.makedirs(os.path.join(output_dir, 'images', split), exist_ok=True)
os.makedirs(os.path.join(output_dir, 'labels', split), exist_ok=True)

# 获取所有图像文件名
image_files = [f for f in os.listdir(image_dir) if os.path.isfile(os.path.join(image_dir, f))]

# 随机打乱文件
random.shuffle(image_files)

# 计算每个数据集的大小
total_count = len(image_files)
train_count = int(total_count * train_ratio)
val_count = int(total_count * val_ratio)

# 划分数据集
train_files = image_files[:train_count]
val_files = image_files[train_count:train_count + val_count]

# 移动文件
def move_files(files, split):
for file in files:
image_path = os.path.join(image_dir, file)
label_path = os.path.join(label_dir, file.replace('.jpg', '.txt'))
shutil.copy(image_path, os.path.join(output_dir, 'images', split, file))
shutil.copy(label_path, os.path.join(output_dir, 'labels', split, file.replace('.jpg', '.txt')))

move_files(train_files, 'train')
move_files(val_files, 'val')


# 设置路径
image_dir = 'dataset/images'
label_dir = 'dataset/labels'
output_dir = 'dataset'

# 划分数据集
split_dataset(image_dir, label_dir, output_dir)

训练

修改ultralytics/cfg/models/v8/yolov8.yaml文件:

在项目根目录下新建my_data.yaml

path: /Users/wuleihuan/ultralytics/dataset 
train: images/train
val: images/val
test:

# Classes
nc: 4
names:
0: holothurian
1: echinus
2: scallop
3: starfish

终端执行命令:

yolo task=detect mode=train data=my_data.yaml model=yolov8s.pt epochs=50 imgsz=640 project=/Users/wuleihuan/ultralytics/runs/detect name=train

训练完成后在weights目录下保存了两个训练时的权重:最新一次保存的last.pt和在验证集上表现最好的best.pt。

混淆矩阵是对分类问题预测结果的总结。

confusion_matrix

confusion_matrix_normalized

F1曲线显示,当置信度阈值在0.2-0.4的区间内,模型在平衡精确率和召回率方面表现较为理想。

F1_curve

PCC图曲线向上并向左弯曲,表示在较低置信度下仍能保持较高的精度,说明检测器在高召回率的同时能够保持低误报率,即对目标的识别准确性较高。

P_curve

当RCC曲线靠近图表的右上角时,说明模型在保持高召回率的同时能够维持较高的精度。在PRC中,曲线越靠近右上角,表示模型在预测时能够同时保证高的精确率和高的召回率,即预测结果较为准确。可以看出,贝类(scallop)的训练效果不太理想。

R_curve

PR_curve

部署

主界面有一个拖选框用于上传图片,两个按钮分别实现选择文件上传和进行检测功能,在上传文件后会显示已上传文件的名称及预览图。后端采用Flask框架,定义了两个路由函数,分别为主界面、检测函数结果显示。

部署的代码不放了,可以修改代码,也在运行的时候指定权重之类的参数:

python predict_api.py --weight /Users/wuleihuan/ultralytics/runs/detect/train2/weights/best.pt

思考

在训练模型之前产生了一点疑问:

问了一下学长,他问我测试集的数据是什么样的,他认为最终的目的肯定是让测试集上的结果好看,增强这一步可以扩充训练集,但直觉上这个扩充不一定会带来好的效果,因为神经网络有些要处理低识别率的数据有些要处理高识别率的,增加了工作量。

其实我也是类似的想法,因为老师给的测试数据集是没有经过增强处理的,我感觉用高清晰度的训练集训练出来的模型可能对低清晰度测试集的检测效果可能并不会更好(还没对比过,只是我的主观臆断),但老师又说建议对数据进行增强预处理,所以我就想不通这一步的作用。

学长答曰:

很好的建议,可惜我手头资源十分紧张,暂时是跑不动了。

又去问了老师,老师的回复是把增强过的图片作为训练集,对于测试集,也可以在检测之前先增强一下。

最终的测试效果是挺好的,我在想能不能把对检测对象的增强这一步放到模型优化里,或者是,假如有一个水下目标检测的系统,能不能在检测之前加入自动增强这一步,毕竟如果要实现摄像头实时检测之类的目的,不能都靠人工手动先对数据做处理。

但是目前来看,增强这一步(尤其是我用的增强方法)的速度比检测还慢,如果真的想要放到水下机器人之类的功能上,还是不能实现实时检测。想看看要怎么实现速率优化,包括硬件和算法等多个方面。