'''
---BlockAI/Titanic_Tensorflow_Tutorial Auto Generate Code---
Author : BlockAI
Project Name: Titanic_Tensorflow_Tutorial
Project Link: https://blockai.kr/BlockAI/Titanic_Tensorflow_Tutorial (BlockAI)
Create Date : 2022-11-02
---Requirements---
# 사용자의 환경(OS, CUDA 등)에 따라 라이브러리 버전을 맞춰주세요
pip install tensorflow==2.5.3
pip install tqdm
pip install pandas
pip install scikit-learn
---Folder Structure---
--📂 data
|--📂 input
|--📄 train.*
|--📄 val.*
|--📄 test.*
--📄 Titanic_Tensorflow_Tutorial.py
--📄 Titanic_Tensorflow_Tutorial.ipynb
--📄 requirements.txt
'''
import os
import argparse
import math
from glob import glob
import pandas as pd
from sklearn import preprocessing
import numpy as np
import tensorflow as tf
path_sep = os.sep
class Dataset(tf.keras.utils.Sequence):
def __init__(self, batch_size, inputs, targets=[], shuffle=False):
self.batch_size = batch_size
self.inputs = np.array(inputs)
self.targets = np.array(targets)
self.shuffle = shuffle
self.indexes = np.arange(len(self.inputs))
if self.shuffle:
np.random.shuffle(self.indexes)
def __len__(self):
# batch_size에 맞게 한 에폭에 사용되는 데이터 개수를 계산해야합니다
return math.ceil(len(self.inputs) / self.batch_size)
def __getitem__(self, idx):
# 한 스텝에서 batch_size만큼 데이터를 선택해서 return 합니다
start_idx = idx * self.batch_size
# end_idx가 최대 길이를 넘어가지 않게 제한합니다
end_idx = (idx + 1) * self.batch_size if len(self.inputs) > (idx + 1) * self.batch_size else len(self.inputs)
batch_indexes = self.indexes[start_idx:end_idx]
if len(self.targets) == 0:
return self.inputs[batch_indexes]
else:
return self.inputs[batch_indexes], self.targets[batch_indexes]
class Dataloader:
def __init__(self, data_folder, batch_size, shuffle):
self.data_folder = data_folder
self.batch_size = batch_size
self.shuffle = shuffle
self.train_dataset = None
self.val_dataset = None
self.test_dataset = None
self.set_preprocessing()
self.setup()
def set_preprocessing(self):
data = pd.read_csv(glob(os.path.join(self.data_folder, 'input', 'train.*'))[0], sep=',', index_col=None, encoding='utf-8')
columns = data.columns
self.target_columns = [columns[1]]
# 전처리할 컬럼들을 선택합니다
self.encoder_columns = [columns[0], columns[1], columns[2], columns[3], columns[4], columns[5], columns[6], columns[7]]
self.scaler_columns = [columns[0], columns[1], columns[2], columns[3], columns[4], columns[5], columns[6], columns[7]]
self.delete_columns = [columns[3], columns[4]]
# 미사용 컬럼들을 삭제합니다
data = data.drop(columns=self.delete_columns)
# 빈 값(nan)을 가지고 있는 열(row)을 삭제합니다
data = data.dropna()
# Scikit-learning 전처리 함수를 생성하고 현재 데이터셋에 맞게 설정합니다
self.encoders = {column: preprocessing.LabelEncoder().fit(data[column]) for column in self.encoder_columns}
self.scalers = {column: preprocessing.StandardScaler().fit(data[[column]]) for column in self.scaler_columns}
def preprocessing(self, data):
columns = data.columns.tolist()
# 빈 값(nan)을 가지고 있는 열(row)을 삭제합니다
data = data.dropna()
# 미사용 컬럼들을 삭제합니다
data = data.drop(columns=self.delete_columns)
# https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.LabelEncoder.html
# 선택 컬럼들을 라벨 인코더로 변환합니다
# 추후 encoders[컬럼명].inverse_transform(복원할 값) 함수로 원본 값을 복원할 수 있습니다
for column in self.encoder_columns:
if column in columns:
data[column] = self.encoders[column].transform(data[column])
# https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.StandardScaler.html
# 선택한 스케일러로 해당 컬럼들을 변환합니다
# 추후 scaler.inverse_transform(복원할 값) 함수로 원본 값을 복원할 수 있습니다
for column in self.scaler_columns:
if column in columns:
data[column] = self.scalers[column].transform(data[[column]])
# 타겟 데이터가 없으면 빈 배열을 리턴합니다.
try:
targets = data[self.target_columns].values.tolist()
inputs = data.drop(self.target_columns, axis=1).values.tolist()
except:
targets = []
inputs = data.values.tolist()
return inputs, targets
def setup(self):
train_data = pd.read_csv(glob(os.path.join(self.data_folder, 'input', 'train.*'))[0], sep=',', index_col=None, encoding='utf-8')
# 학습데이터 준비
train_inputs, train_targets = self.preprocessing(train_data)
# 검증데이터 준비
val_data = pd.read_csv(glob(os.path.join(self.data_folder, 'input', 'val.*'))[0], sep=',', index_col=None, encoding='utf-8')
val_inputs, val_targets = self.preprocessing(val_data)
# train 데이터만 shuffle을 적용해줍니다, 필요하다면 val, test 데이터에도 shuffle을 적용할 수 있습니다
self.train_dataset = Dataset(self.batch_size, train_inputs, train_targets, self.shuffle)
self.val_dataset = Dataset(self.batch_size, val_inputs, val_targets)
# 평가데이터 준비
test_data = pd.read_csv(glob(os.path.join(self.data_folder, 'input', 'test.*'))[0], sep=',', index_col=None, encoding='utf-8')
test_inputs, test_targets = self.preprocessing(test_data)
self.test_dataset = Dataset(self.batch_size, test_inputs, test_targets)
class Model(tf.keras.Model):
def __init__(self):
super(Model, self).__init__()
self.dense_1 = tf.keras.layers.Dense(units=128)
self.dropout_1 = tf.keras.layers.Dropout(rate=0.2)
self.relu_1 = tf.keras.layers.ReLU()
self.dense_2 = tf.keras.layers.Dense(units=1)
self.binarycrossentropy_1 = tf.keras.losses.BinaryCrossentropy()
def call(self, x):
logits_0 = self.dense_1(x)
logits_0 = self.dropout_1(logits_0)
logits_0 = self.relu_1(logits_0)
logits_0 = self.dense_2(logits_0)
return logits_0
def train_step(self, data):
x, y = data
y_preds = self(x)
loss = self.binarycrossentropy_1(y, y_preds)
return {'loss': loss}
def test_step(self, data):
x, y = data
y_preds = self(x)
loss = self.binarycrossentropy_1(y, y_preds)
return {'loss': loss}
def predict_step(self, data):
x = data
y_preds = self(x)
return y_preds
if __name__ == '__main__':
# https://docs.python.org/ko/3/library/argparse.html
# 하이퍼 파라미터 등 각종 설정값을 입력받습니다
# 터미널 실행 예시 : python3 run.py --batch_size=64 ...
# 실행 시 '--batch_size=64' 같은 인자를 입력하지 않으면 default 값이 기본으로 실행됩니다
parser = argparse.ArgumentParser()
parser.add_argument('--data_folder', default='./data')
parser.add_argument('--batch_size', default=32)
parser.add_argument('--max_epoch', default=100)
parser.add_argument('--shuffle', default=True)
args = parser.parse_args()
dataloader = Dataloader(args.data_folder, args.batch_size, args.shuffle)
model = Model()
# 손실함수와 옵티마이저를 설정합니다
optimizer = tf.keras.optimizers.Adam(learning_rate=0.0001)
model.compile(optimizer=optimizer)
# 모델 학습을 실행합니다
model.fit(dataloader.train_dataset, epochs=args.max_epoch, validation_data=dataloader.val_dataset)
# 테스트 데이터에 타겟값이 있다면 evaluate를, 없다면 predict를 실행해주세요
model.evaluate(dataloader.test_dataset)
preds = model.predict(dataloader.test_dataset)
./titanic 폴더 아래에 train.csv와 test.csv를 넣어서 사용해주세요.
가이드를 위한 프로젝트이기때문에 모델의 예측능력은 없습니다.