Etrain used the source code from a paper by the University of Manchester for the Cough-based COVID-19 Detection Challenge.

The dataset you will download from the web of the competition. Download and extract the dataset

%cd /content/
!unzip -q /content/aicv115m_public_train.zip -d ./
!unzip -q /content/aicv115m_public_train/train_audio_files_8k.zip
!unzip -q /content/aicv115m_private_test.zip -d ./

Import libraries

import librosa
import librosa.display
from tqdm import tqdm

import pandas as pd
import numpy as np
import os
from sklearn.model_selection import train_test_split
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import BatchNormalization, Conv2D, Dense, GlobalAveragePooling2D
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import optimizers
from tensorflow.keras.utils import plot_model
from datetime import datetime
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score

import matplotlib
import matplotlib.pyplot as plt
import itertools
import pylab

Load the train.csv file

train_df = pd.read_csv("/content/aicv115m_public_train/metadata_train_challenge.csv")
train_df

train.csv

Use the Librosa Python library to extract features

# Reshape the data
target_names = ['not_covid', 'covid']
num_rows = 120
num_columns = 431
num_channels = 1

def extract_features(file_name):
    try:
        """
            Load and preprocess the audio
        """
        audio, sample_rate = librosa.load(file_name)
        y = audio

        """
            Convert to MFCC numpy array
        """
        max_pad_length = 431
        n_mfcc = 120
        n_fft = 4096
        hop_length = 512
        n_mels = 512
        mfccs = librosa.feature.mfcc(y=y, sr=sample_rate, n_mfcc=n_mfcc, n_fft=n_fft, hop_length=hop_length, n_mels=n_mels)
        pad_width = max_pad_length-mfccs.shape[1]
        mfccs = np.pad(mfccs, pad_width=((0,0),(0,pad_width)), mode='constant')
    except Exception as e:
        print("Error encountered while parsing file: ", e)
        return None, sample_rate
    return mfccs, sample_rate

mfcc_image_path = "mfcc"
if not os.path.exists(mfcc_image_path):
    os.makedirs(mfcc_image_path)

def plot_mfcc(filename, mfcc, sr):
    plt.figure(figsize=(10, 4))
    librosa.display.specshow(librosa.amplitude_to_db(mfcc, ref=np.max), y_axis='mel', x_axis='time', sr=sr)
    plt.colorbar(format='%+2.0f dB')
    plt.title(filename)
    plt.tight_layout()
    pylab.savefig(os.path.join(mfcc_image_path, filename+'.png'), bbox_inches=None, pad_inches=0)
    pylab.close()

def process_dataset(df, PLOT_MFCC):
    features = []
    for index, row in tqdm(df.iterrows(), total=df.shape[0]):
        file_properties = row["file_path"]
        file_name = '/content/train_audio_files_8k/'+file_properties
        class_label = row["assessment_result"]
        data, sr = extract_features(file_name)
        if data is not None:
            features.append([data, class_label])
            # Save an image of the MFCC
            if PLOT_MFCC:
                plot_mfcc(file_properties+'_'+str(class_label), data, sr)
        else:
            print("Data is empty: ", file_name)

    # Convert into a Panda dataframe
    featuresdf = pd.DataFrame(features, columns=['feature','class_label'])
    print(featuresdf)
    print('Finished feature extraction from ', len(featuresdf), ' files')

    # Convert features and corresponding classification labels into numpy arrays
    X = np.array(featuresdf.feature.tolist())
    y = np.array(featuresdf.class_label.tolist())
    return X, y

X, y = process_dataset(train_df, True)

Split train and valid dataset

# Split the dataset
x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state = 42)
x_train = x_train.reshape(x_train.shape[0], num_rows, num_columns, num_channels)
x_test = x_test.reshape(x_test.shape[0], num_rows, num_columns, num_channels)
print(x_train.shape, x_test.shape, y_train.shape, y_test.shape)

Define the CNN model

def CNN():
    # Construct model
    model = Sequential()
    model.add(Conv2D(16, (7,7), input_shape=(num_rows, num_columns, num_channels), activation='relu', padding="same"))
    model.add(BatchNormalization())
    model.add(Conv2D(32, (3,3), activation='relu', padding="same"))
    model.add(BatchNormalization())
    model.add(Conv2D(64, (3,3), activation='relu', padding="same"))
    model.add(BatchNormalization())
    model.add(Conv2D(128, (3,3), activation='relu', padding="same"))
    model.add(BatchNormalization())
    model.add(Conv2D(256, (3,3), activation='relu', padding="same"))
    model.add(BatchNormalization())
    model.add(Conv2D(512, (1,1), activation='relu', padding="same"))
    model.add(BatchNormalization())
    model.add(GlobalAveragePooling2D())
    model.add(Dense(1, activation='sigmoid'))
    learning_rate = 0.00001
    opt = optimizers.Adam(learning_rate=learning_rate)
    model.compile(loss='binary_crossentropy', metrics=['accuracy'], optimizer=opt)
    model.summary()
    return model

cnn_model = CNN()

Model

Train model

def train_model(model, x_train, x_test, y_train, y_test):
    # Calculate pre-training accuracy
    score = model.evaluate(x_test, y_test, verbose=1)
    accuracy = 100*score[1]
    print("Pre-training accuracy: %.4f%%" % accuracy)
    # Train the model
    num_epochs = 100
    num_batch_size = 10
    start = datetime.now()
    checkpointer = ModelCheckpoint(filepath='saved_models/weights.best.basic_cnn.hdf5', verbose=1, save_best_only=True)
    es_callback = EarlyStopping(monitor='val_loss', patience=10, verbose=1)
    reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.0001, patience=7, verbose=1, mode='auto', min_delta=0.001, cooldown=1, min_lr=0)
    history = model.fit(x_train, y_train, batch_size=num_batch_size, epochs=num_epochs, validation_split=0.2, shuffle=False, callbacks = [checkpointer, es_callback], verbose=2)
    #history = model.fit(x_train, y_train, batch_size=num_batch_size, epochs=num_epochs, validation_data=(x_test, y_test), shuffle=True, callbacks=[checkpointer], verbose=1)
    duration = datetime.now() - start
    print("Training completed in time: ", duration)
    # Evaluating the model on the training and testing set
    score = model.evaluate(x_train, y_train, verbose=0)
    print("Training Accuracy: ", score[1])
    score = model.evaluate(x_test, y_test, verbose=0)
    print("Testing Accuracy: ", score[1])
    return history

history = train_model(cnn_model, x_train, x_test, y_train, y_test)

The accuracy and loss result

mfcc_image_path = "plots"
if not os.path.exists(mfcc_image_path):
    os.makedirs(mfcc_image_path)

def plot_graphs(history):
    # Plot training & validation accuracy values
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title('Model accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Test'], loc='upper left')
    #plt.show()
    plt.savefig('plots/accuracy.png')
    plt.clf()
    # Plot training & validation loss values
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('Model loss')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Test'], loc='upper left')
    #plt.show()
    plt.savefig('plots/loss.png')
    plt.close()

plot_graphs(history)

Acc Loss

confusion_matrix and classification_report

def plot_classification_report(x_test, y_test):
    # Print
    print(classification_report(x_test, y_test, target_names=target_names))
    # Save data
    clsf_report = pd.DataFrame(classification_report(y_true = x_test, y_pred = y_test, output_dict=True, target_names=target_names)).transpose()
    clsf_report.to_csv('plots/classification_report.csv', index= True)

def plot_confusion_matrix(cm, target_names, title='Confusion matrix', cmap=None, normalize=True):
    matplotlib.rcParams.update({'font.size': 22})
    accuracy = np.trace(cm) / float(np.sum(cm))
    misclass = 1 - accuracy
    if cmap is None:
        cmap = plt.get_cmap('Blues')
    plt.figure(figsize=(14, 12))
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    if target_names is not None:
        tick_marks = np.arange(len(target_names))
        plt.xticks(tick_marks, target_names, rotation=45)
        plt.yticks(tick_marks, target_names)
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
    thresh = cm.max() / 1.5 if normalize else cm.max() / 2
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        if normalize:
            plt.text(j, i, "{:0.4f}".format(cm[i, j]),
                     horizontalalignment="center",
                     color="white" if cm[i, j] > thresh else "black")
        else:
            plt.text(j, i, "{:,}".format(cm[i, j]),
                     horizontalalignment="center",
                     color="white" if cm[i, j] > thresh else "black")
    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label\naccuracy={:0.4f}; misclass={:0.4f}'.format(accuracy, misclass))
    plt.savefig('plots/confusion_matrix.png', bbox_inches = "tight")
    plt.close()

model = keras.models.load_model('/content/saved_models/weights.best.basic_cnn.hdf5')
y_pred = model.predict(x_train)
predictions = (y_pred > 0.5).astype("int32")
predictions = [p for p in predictions]
cm = confusion_matrix(y_train, predictions)
plot_confusion_matrix(cm, target_names)
plot_classification_report(y_train, predictions)

Loss Loss

The private test score in the stage of beginning is: 0.643352

References

High accuracy classification of COVID-19 coughs using Mel-frequency cepstral coefficients and a Convolutional Neural Network with a use case for smart home devices