'''
---BlockAI/MNIST_TensorFlow_Tutorial Auto Generate Code---
Author : BlockAI
Project Name: MNIST_TensorFlow_Tutorial
Project Link: https://blockai.kr/BlockAI/MNIST_TensorFlow_Tutorial (BlockAI)
Create Date : 2022-11-02
---Requirements---
# 사용자의 환경(OS, CUDA 등)에 따라 라이브러리 버전을 맞춰주세요
pip install tensorflow==2.5.3
pip install tqdm
pip install pandas
pip install scikit-learn
---Folder Structure---
--📂 data
|--📂 input
|--📂 train
|--📁 LABEL_NAME_1
|--🖼️ IMAGE_FILE
|--🖼️ IMAGE_FILE
|--🖼️ ...
|--📁 LABEL_NAME_2
|--🖼️ IMAGE_FILE
|--🖼️ IMAGE_FILE
|--🖼️ ...
|--📁 ...
|--📂 val
|--📁 LABEL_NAME_1
|--🖼️ IMAGE_FILE
|--🖼️ IMAGE_FILE
|--🖼️ ...
|--📁 LABEL_NAME_2
|--🖼️ IMAGE_FILE
|--🖼️ IMAGE_FILE
|--🖼️ ...
|--📁 ...
|--📂 test
|--📁 LABEL_NAME_1
|--🖼️ IMAGE_FILE
|--🖼️ IMAGE_FILE
|--🖼️ ...
|--📁 LABEL_NAME_2
|--🖼️ IMAGE_FILE
|--🖼️ IMAGE_FILE
|--🖼️ ...
|--📁 ...
--📄 MNIST_TensorFlow_Tutorial.py
--📄 MNIST_TensorFlow_Tutorial.ipynb
--📄 requirements.txt
'''
import os
import argparse
import math
import random
from glob import glob
from sklearn import preprocessing
import numpy as np
import tensorflow as tf
path_sep = os.sep
class Dataset(tf.keras.utils.Sequence):
def __init__(self, batch_size, inputs, targets=[], shuffle=False):
self.batch_size = batch_size
self.inputs = np.array(inputs)
self.targets = np.array(targets)
self.shuffle = shuffle
self.indexes = np.arange(len(self.inputs))
if self.shuffle:
np.random.shuffle(self.indexes)
def input_preprocessing(self, image_paths):
images = []
for image_path in image_paths:
# 이미지를 PIL로 불러와서 numpy array로 변환합니다, load_img에 color_mode 등의 옵션을 넣을 수 있습니다
image = tf.keras.preprocessing.image.load_img(image_path)
image = tf.keras.preprocessing.image.img_to_array(image)
# 입력값을 0~1 사이로 변환합니다
image /= 255.0
images.append(image)
return np.array(images)
def __len__(self):
# batch_size에 맞게 한 에폭에 사용되는 데이터 개수를 계산해야합니다
return math.ceil(len(self.inputs) / self.batch_size)
def __getitem__(self, idx):
# 한 스텝에서 batch_size만큼 데이터를 선택해서 return 합니다
start_idx = idx * self.batch_size
# end_idx가 최대 길이를 넘어가지 않게 제한합니다
end_idx = (idx + 1) * self.batch_size if len(self.inputs) > (idx + 1) * self.batch_size else len(self.inputs)
batch_indexes = self.indexes[start_idx:end_idx]
# 이미지 전처리를 수행한 다음 return 합니다
if len(self.targets) == 0:
return self.input_preprocessing(self.inputs[batch_indexes])
else:
return self.input_preprocessing(self.inputs[batch_indexes]), self.targets[batch_indexes]
class Dataloader:
def __init__(self, data_folder, batch_size, shuffle):
self.data_folder = data_folder
self.batch_size = batch_size
self.shuffle = shuffle
self.train_dataset = None
self.val_dataset = None
self.test_dataset = None
self.setup()
def get_inputs_targets(self, data):
inputs = []
targets = []
for data_path in data:
# 이미지 경로를 저장합니다
inputs.append(data_path)
# 이미지 경로에서 타겟 값을 분리하여 저장합니다
targets.append(data_path.split(path_sep)[-2])
return inputs, targets
def setup(self):
# train 폴더에 있는 모든 이미지 경로를 받아와서 데이터셋을 만듭니다
train_data = glob(os.path.join(self.data_folder, 'input', 'train', '*', '*'))
train_inputs, train_targets = self.get_inputs_targets(train_data)
# val 폴더에 있는 모든 이미지 경로를 받아와서 데이터셋을 만듭니다
val_data = glob(os.path.join(self.data_folder, 'input', 'val', '*', '*'))
val_inputs, val_targets = self.get_inputs_targets(val_data)
# test 폴더에 있는 모든 이미지 경로를 받아와서 데이터셋을 만듭니다
test_data = glob(os.path.join(self.data_folder, 'input', 'test', '*', '*'))
test_inputs, test_targets = self.get_inputs_targets(test_data)
self.target_encoder = preprocessing.LabelEncoder()
train_targets = self.target_encoder.fit_transform(train_targets)
val_targets = self.target_encoder.transform(val_targets)
test_targets = self.target_encoder.transform(test_targets)
self.train_dataset = Dataset(self.batch_size, train_inputs, train_targets)
self.val_dataset = Dataset(self.batch_size, val_inputs, val_targets)
self.test_dataset = Dataset(self.batch_size, test_inputs, test_targets)
class Model(tf.keras.Model):
def __init__(self):
super(Model, self).__init__()
self.conv2d_1 = tf.keras.layers.Conv2D(filters=3, kernel_size=1)
self.conv2d_2 = tf.keras.layers.Conv2D(filters=3, kernel_size=1)
self.flatten_1 = tf.keras.layers.Flatten()
self.dense_1 = tf.keras.layers.Dense(units=256)
self.dense_2 = tf.keras.layers.Dense(units=10)
self.categoricalcrossentropy_1 = tf.keras.losses.CategoricalCrossentropy()
def call(self, x):
logits_0 = self.conv2d_1(x)
logits_0 = self.conv2d_2(logits_0)
logits_0 = self.flatten_1(logits_0)
logits_0 = self.dense_1(logits_0)
logits_0 = self.dense_2(logits_0)
return logits_0
def train_step(self, data):
x, y = data
y_preds = self(x)
loss = self.categoricalcrossentropy_1(y, y_preds)
return {'loss': loss}
def test_step(self, data):
x, y = data
y_preds = self(x)
loss = self.categoricalcrossentropy_1(y, y_preds)
return {'loss': loss}
def predict_step(self, data):
x = data
y_preds = self(x)
return y_preds
if __name__ == '__main__':
# https://docs.python.org/ko/3/library/argparse.html
# 하이퍼 파라미터 등 각종 설정값을 입력받습니다
# 터미널 실행 예시 : python3 run.py --batch_size=64 ...
# 실행 시 '--batch_size=64' 같은 인자를 입력하지 않으면 default 값이 기본으로 실행됩니다
parser = argparse.ArgumentParser()
parser.add_argument('--data_folder', default='./data')
parser.add_argument('--batch_size', default=32)
parser.add_argument('--max_epoch', default=300)
parser.add_argument('--shuffle', default=True)
args = parser.parse_args()
dataloader = Dataloader(args.data_folder, args.batch_size, args.shuffle)
model = Model()
# 손실함수와 옵티마이저를 설정합니다
optimizer = tf.keras.optimizers.Adam(learning_rate=0.0001)
model.compile(optimizer=optimizer)
# 모델 학습을 실행합니다
model.fit(dataloader.train_dataset, epochs=args.max_epoch, validation_data=dataloader.val_dataset)
# 테스트 데이터에 타겟값이 있다면 evaluate를, 없다면 predict를 실행해주세요
model.evaluate(dataloader.test_dataset)
preds = model.predict(dataloader.test_dataset)
원본 MNIST데이터셋을 PNG 이미지로 변환하여 활용합니다.
폴더 구조는 ./mnist/train/0/.png, ./mnist/train/1/.png, ... 입니다, 0, 1이 예측할 정답입니다.