Planets Recognition - Deep Learning

 · 20 mins read

Se va a entrenar modelo para reconocer tres planetas: Jupiter, Saturno, Urano. Se trata de un problema supervisado, por lo que necesitamos los datos etiquetados.

import os, shutil
from tensorflow.keras import layers
from tensorflow.keras import models
from tensorflow.keras import optimizers, callbacks
from tensorflow.keras.utils import plot_model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.preprocessing import image
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from sklearn.metrics import confusion_matrix
Primer paso: obtencion de imagenes de estos tres planetas. 
Esto se ha realizado mediante un scraper, obteniendo 500 imagenes por cada planeta,
se han eliminado algunas imagenes no representativas

!cp ./drive/MyDrive/data.zip .
!unzip -q ./data.zip
num_jupiter = !ls ./data/jupiter | wc -l
num_jupiter = int(num_jupiter[0])
print(num_jupiter)
num_saturn = !ls ./data/saturn | wc -l 
num_saturn = int(num_saturn[0])
print(num_saturn)
num_uranus = !ls ./data/uranus | wc -l
num_uranus = int(num_uranus[0])
print(num_uranus)
Hay unas 440 imagenes por planeta. Ahora creamos el dataset (train, validation, test). 
Este es un problema con pocas imagenes y quiza nos quedamos cortos separando en train, validation y test. Pero es lo que se haria 
en un problema mayor. Otra opcion es implementar una validacion cruzada.
Con conjunto de validacion testeamos nuestros parametros/hiperparametros.
El objetivo ahora es tener la siguiente estructura: dentro del conjunto de datos (planets), crear conjuntos
train, validation, test. Dentro de cada uno de estos, separar los datos por clases (carpetas jupiter, saturn, uranus)
Esto nos permite posteriormente entrenar la red de manera correcta

original_data_dir = './data'
#Creamos carpeta donde se va a preparar datos de train, validation, test
dir_nuevo = './planets'
os.mkdir(dir_nuevo)

#Creamos carpetas train, validation, test dentro de carpeta de datos planets
train_dir = os.path.join(dir_nuevo, 'train')
os.mkdir(train_dir)
validation_dir = os.path.join(dir_nuevo, 'validation')
os.mkdir(validation_dir)
test_dir = os.path.join(dir_nuevo, 'test')
os.mkdir(test_dir)

#Dentro de cada carpeta de train, validation y test, separamos por clase
train_jupiter_dir = os.path.join(train_dir,'jupiter')
os.mkdir(train_jupiter_dir)

train_saturn_dir = os.path.join(train_dir,'saturn')
os.mkdir(train_saturn_dir)

train_uranus_dir = os.path.join(train_dir,'uranus')
os.mkdir(train_uranus_dir)

validation_jupiter_dir = os.path.join(validation_dir,'jupiter')
os.mkdir(validation_jupiter_dir)

validation_saturn_dir = os.path.join(validation_dir,'saturn')
os.mkdir(validation_saturn_dir)

validation_uranus_dir = os.path.join(validation_dir,'uranus')
os.mkdir(validation_uranus_dir)

test_jupiter_dir = os.path.join(test_dir,'jupiter')
os.mkdir(test_jupiter_dir)

test_saturn_dir = os.path.join(test_dir,'saturn')
os.mkdir(test_saturn_dir)

test_uranus_dir = os.path.join(test_dir,'uranus')
os.mkdir(test_uranus_dir)

#Ahora separamos las imagenes de los planetas entre train, validation y test

#Porcentajes de split (se entrena con mas imagenes y se testea con menos)
train = 0.6
validation = 0.2
test = 0.2

#Imagenes de jupiter
fnames = os.listdir( os.path.join(original_data_dir, "jupiter"))
for i, fname in enumerate(fnames):
  src = os.path.join( os.path.join(original_data_dir, "jupiter") , fname)

  if(i<=num_jupiter*train): 
    dst = os.path.join(train_jupiter_dir, fname)
  elif(i>num_jupiter*train and i<num_jupiter*(train+validation)): 
    dst = os.path.join(validation_jupiter_dir, fname)
  else:
    dst = os.path.join(test_jupiter_dir, fname)
  
  shutil.copyfile(src, dst)

#Imagenes de saturno
fnames = os.listdir( os.path.join(original_data_dir, "saturn"))
for i, fname in enumerate(fnames):
  src = os.path.join( os.path.join(original_data_dir, "saturn") , fname)

  if(i<=num_saturn*train): 
    dst = os.path.join(train_saturn_dir, fname)
  elif(i>num_saturn*train and i<num_saturn*(train+validation)): 
    dst = os.path.join(validation_saturn_dir, fname)
  else:
    dst = os.path.join(test_saturn_dir, fname)
  
  shutil.copyfile(src, dst)

#Imagenes de urano
fnames = os.listdir( os.path.join(original_data_dir, "uranus"))
for i, fname in enumerate(fnames):
  src = os.path.join( os.path.join(original_data_dir, "uranus") , fname)

  if(i<=num_uranus*train): 
    dst = os.path.join(train_uranus_dir, fname)
  elif(i>num_uranus*train and i<num_uranus*(train+validation)): 
    dst = os.path.join(validation_uranus_dir, fname)
  else:
    dst = os.path.join(test_uranus_dir, fname)
  
  shutil.copyfile(src, dst)
Mostramos imagen por clase, de conjunto de test

img_jupiter = './planets/test/jupiter/Image_142.jpg'
img_saturn = './planets/test/saturn/Image_227.jpg'
img_uranus = './planets/test/uranus/Image_211.png'

images = [img_jupiter,img_saturn,img_uranus]

for img in images:
  img = image.load_img(img, target_size=(224, 224))
  img_tensor = image.img_to_array(img)
  img_tensor = np.expand_dims(img_tensor, axis=0)
  img_tensor /= 255 
  plt.imshow(img_tensor[0])
  plt.show()
Este momento es muy importante, donde procesamos los datos.

Se aplica normalizacion de los pixeles, con valores pequenios el aprendizaje suele dar mejores resultados.

Ademas, se aplica Data Augmentation. Esto viene muy bien cuando nuestro dataset es pequenio,
puesto que va realizando transformaciones a las imagenes de manera que permite a la red obtener patrones mas generales. 
Ejemplos de transformaciones son rotaciones, ligeros desplazamientos, zoom, cambio de orientacion
Estas solo se aplican a imagenes de entrenamiento pero no a validacion ni test, que son las que nos sirven para comprobar
el rendimiento de la red

El tamanio de imagen es a elegir. No se puede poner un valor muy pequenio porque se puede perder demasiada informacion,
tampoco un valor muy alto porque aumenta en gran medida el numero de parametros, entre otras cosas
Viendo el tamanio de nuestras imagenes originales, 224x224 esta correcto, siguiendo un esquema clasico en redes como VGG-16.

En cuanto al tamanio de batch es conveniente que sea potencia de 2, tambien es otro valor a probar.

train_datagen = ImageDataGenerator(
        rescale=1./255,
        rotation_range=10,
        width_shift_range=0.1,
        height_shift_range=0.1,
        shear_range=0.1,
        zoom_range=0.1,
        horizontal_flip=True)

test_datagen = ImageDataGenerator(rescale=1./255)

train_generator = train_datagen.flow_from_directory(
        train_dir,
        target_size=(224, 224),
        batch_size=32,
        class_mode='categorical')

validation_generator = test_datagen.flow_from_directory(
        validation_dir,
        target_size=(224, 224),
        batch_size=32,
        class_mode='categorical')

test_generator = test_datagen.flow_from_directory(
        test_dir,
        target_size=(224, 224),
        batch_size=32,
        class_mode='categorical')
Una vez se tienen todos los datos listos, se crea el modelo
En cuanto a la estructura de la red, se trata de probar diferentes arquitecturas y analizar los resultados con el conjunto de validacion.
El esquema clasico sigue el patron capa convolucion + pooling.
El tamanio del problema es pequenio y las imagenes tienen un tamanio medio, si ponemos demasiadas capas
el modelo va a ser muy especifico aprendiendose los patrones y va a perder generalizacion, si no ponemos las suficientes capas
no aprendera los patrones necesarios y cuando se llegue a la capa flatten el numero de parametros sera enorme

La funcion de activacion relu se suele poner en las capas intermedias, ofrece buenos resultados.
En la capa de salida hay que poner una funcion de activacion acorde a las salidas del problema,
es decir, si es un problema con una neurona de salida (binario) se pondra funcion de activacion sigmoidal,
en nuestro caso que tenemos tres neuronas de salida, una por planeta, se pondra funcion de activacion softmax

Normalmente la profundidad del mapa de caracteristicas va en aumento por capa (32, 64, 128...)
Tipicamente el slide aplicado en la convolucion es de 3x3 o 5x5


model = models.Sequential()
model.add(layers.Conv2D(32, (3, 3), activation='relu', input_shape=(224, 224, 3)))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Conv2D(64, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Conv2D(128, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Conv2D(128, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Flatten())
model.add(layers.Dense(512, activation='relu'))
model.add(layers.Dense(3, activation='softmax'))
model.summary()
#Estructura de la red
plot_model(model, to_file='model_plot.png', show_shapes=True, show_layer_names=False)
A la hora de elegir funcion loss, se tiene que tener en cuenta las salidas del problema, igual que antes.
En este caso se escoge una acorde con softmax

En cuanto a los optimizadores, se trata de ir probando tambien. Mas tipicos son RMSProp o Adam.
La tasa de aprendizaje tambien puede ser otro valor a iterar

model.compile(loss='categorical_crossentropy',
              optimizer=optimizers.RMSprop(learning_rate=1e-4),
              metrics=['acc'])
Con los llamados callbacks podemos guardar el mejor modelo en base a alguna metrica.
En este caso, lo hacemos en relacion al accuracy sobre el conjunto de validacion.

modelCheckCallback = callbacks.ModelCheckpoint(
                        filepath='./model.h5',
                        monitor='val_acc',
                        save_best_only=True,
                    )
Finalmente se procede a entrenar la red.
El numero de epochs es un valor a probar. Es buena practica poner valor alto hasta que converja la red.


history = model.fit(
      train_generator,
      epochs=40,
      validation_data=validation_generator,
      verbose = 2,
      callbacks = [modelCheckCallback])
#Muestra de resultados

test_loss, test_acc = model.evaluate(test_generator, verbose=0)

print("------------RESULTADOS-------------")
print(f"Training accuracy: {history.history['acc'][-1]}")
print(f"Validation accuracy: {history.history['val_acc'][-1]}")
print(f"Test accuracy: {test_acc}")
print(f"Training loss: {history.history['loss'][-1]}")
print(f"Validation loss: {history.history['val_loss'][-1]}")
print(f"Test loss: {test_loss}")
print("----------------------------------")
Mostramos grafica de accuracy

acc = history.history['acc']
val_acc = history.history['val_acc']
epochs = range(1, len(acc)+1)
plt.clf()
plt.grid(linestyle='-',linewidth=1.6, alpha=0.3)
plt.plot(epochs, acc, 'b', linewidth=1.0, label='Training')
plt.plot(epochs, val_acc, 'r', linewidth=1.0, label='Validation')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.savefig('acc.png')
Mostramos grafica de loss

loss_values = history.history['loss']
val_loss_values = history.history['val_loss']

plt.clf()
plt.grid(linestyle='-',linewidth=1.6, alpha=0.3)
plt.plot(epochs, loss_values, 'b', linewidth=1.0, label='Training')
plt.plot(epochs, val_loss_values, 'r', linewidth=1.0, label='Validation')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.savefig('loss.png')
Cuando se trabaja con pocos datos es frecuente que haya fluctuaciones
Mostramos la matriz de confusion para el conjunto de test, observando el rendimiento de la red con datos nuevos

test_generator.shuffle = False
test_generator.index_array = None


test_pred = model.predict(test_generator)
test_pred = (test_pred > 0.5).astype("int32")
test_conf_matrix = confusion_matrix(test_generator.classes, np.argmax(test_pred, axis=1))

plt.clf()
sns.heatmap(test_conf_matrix, xticklabels=['jupiter','saturn','uranus'], yticklabels=['jupiter','saturn','uranus'], linewidths=.2, fmt="")
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.title('Test confusion matrix')
plt.savefig('test_conf_mat.png')
Se puede ver que clasifica correctamente los planetas Saturno y Urano, pero tiene un poco mas de problemas clasificando a Jupiter,
lo confunde con Saturno a veces