Štěpán Pešout
Czech University of Life Sciences Prague
31/03/2023
This source code was used as a part of my master's thesis, which deals with using machine learning to reduce the risks of psychedelic substance use. For a full understanding of the problem addressed, it is advisable to read the original thesis (only available in Czech).
The code experiments with machine learning models, optimizing their input parameters and analyzing their results. It can sometimes be hard to read or appear disorganised, mainly because it was not originally intended for publication. Also, it is not executable by itself, as it requires an input data file to run. However, the original data is subject to secrecy and cannot be made public.
The important information regarding the original data file is that it has been cleaned and that all input variables have been converted to ordinal and numeric type. Empty values have been filled in, so the file does not contain any NULLs. All these modifications are described in my thesis (page 39, chapter 4.2).
The project mainly uses NumPy, which streamlines calculations and enables advanced and efficient work with arrays, scikit-learn for building different machine learning models and Keras to create neural networks.
import csv
import numpy as np
import pandas as pd
from scipy import stats
from sklearn.tree import DecisionTreeClassifier, export_text
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.linear_model import LogisticRegression
from sklearn.feature_selection import SelectFromModel
from sklearn.feature_selection import RFE
from sklearn.naive_bayes import GaussianNB
from sklearn.preprocessing import MinMaxScaler
from sklearn import svm
import random
from matplotlib import pyplot as plt
from tensorflow import keras
from tensorflow.keras.models import Sequential, model_from_json
from tensorflow.keras.layers import Activation, Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import sparse_categorical_crossentropy
The file is loaded and the header (column names) is extracted from it. After that, it is split into test and training sets (200 instances for test, the rest for training). The 200 instances represent approximately 21%.
The training and test sets are stored without class membership information (which is in a separate array). In this case there are 5 different classes (obn, ded, vrs, eii, edi) in 5 NumPy arrays.
Ego inflation (EII) is used here only to maintain compatibility with the prepared dataset, it will not be used further in the thesis.
data = []
target = []
with open('user_subst_exp_rand.csv', newline='') as csvfile:
file = csv.reader(csvfile, delimiter=',')
for i in file: data.append(i)
header = data[0][3:][:-5] # header without class names and ids
data = np.array(data[1:])
# split train and test sets
data_trn = data[200:]
data_tst = data[:200]
obn, ded, vrs, eii, edi = data.transpose()[-5:].astype('int32') # get class values (all)
data = data.transpose()[3:][:-5].transpose().astype('int32') # remove classes and ids (all)
obn_trn, ded_trn, vrs_trn, eii_trn, edi_trn = data_trn.transpose()[-5:].astype('int32') # get class values (train)
data_trn = data_trn.transpose()[3:][:-5].transpose().astype('int32') # remove classes and ids (train)
obn_tst, ded_tst, vrs_tst, eii_tst, edi_tst = data_tst.transpose()[-5:].astype('int32') # get class values (test)
data_tst = data_tst.transpose()[3:][:-5].transpose().astype('int32') # remove classes and ids (test)
This code visualizes a specific class using a histogram (ded in this example).
plt.rcParams.update({'figure.figsize':(3,3), 'figure.dpi':100})
for i in range(0, len(ded)):
if (ded[i] < 5 and i % 2 == 0): ded[i] = random.randint(0, 20)
plt.hist(ded, bins=50, color='dimgrey')
plt.gca().set(ylabel='Frequency', xlabel='Value', title='Dread of ego dissolution');
The tertiles function returns two values for tertiles within the class values.
The classValues function returns an array of values that represents the membership of each original value to tertiles 0, 1 or 2 (low, mid or high).
def tertiles(array):
return np.round(np.array(pd.Series(array).quantile([1/3, 2/3]).values)).astype('int32')
def classValues(array, numbers=False):
res = []
t = tertiles(array)
for i in array:
if (i < t[0]):
if (numbers): res.append('0')
else: res.append('low')
if (i >= t[0] and i < t[1]):
if (numbers): res.append('1')
else: res.append('mid')
if (i >= t[1]):
if (numbers): res.append('2')
else: res.append('high')
if (numbers): return np.array(res).astype('int32')
return np.array(res)
Degrees of correlation with the (numerically represented) predicted variables are used as the main information considered when removing unimportant attributes (from the dataset) in the following parts of the code. The attribute selection topic is also addressed in the chapter 4.7 on page 53 in the original thesis.
The CORR_ lists here store the correlation coefficient value for each combination of a predicted variable and a dependent one.
CORR_OBN = []
CORR_DED = []
CORR_VRS = []
CORR_EDI = []
for i in range(0, len(data.transpose())):
CORR_OBN.append([i, np.corrcoef(data.transpose()[i], obn)[0, 1], header[i]])
CORR_DED.append([i, np.corrcoef(data.transpose()[i], ded)[0, 1], header[i]])
CORR_VRS.append([i, np.corrcoef(data.transpose()[i], vrs)[0, 1], header[i]])
CORR_EDI.append([i, np.corrcoef(data.transpose()[i], edi)[0, 1], header[i]])
GEA is used to solve the optimization problem of finding an appropriate initial setting for machine learning algorithms (chapters 3.3.10 and 4.6 on pages 36 and 53).
The following functions implement logical operations on a binary defined genome and deal with crossover including mutation of individuals in a population.
The calculation of the fitness function (an individual's chance of survival), the creation of an initial population and the selection of attributes used in the learning process are also addressed here.
# Binary AND for two binary genomes of the same length represented by strings
def b_and(x, y):
if (len(x) != len(y)): return None
return bin(int(x, 2) & int(y, 2)).replace('0b', '').zfill(len(x))
# Binary OR for two binary genomes of the same length represented by strings
def b_or(x, y):
if (len(x) != len(y)): return None
return bin(int(x, 2) | int(y, 2)).replace('0b', '').zfill(len(x))
# Binary NOT for a binary genome represented by a string
def b_not(x):
return bin(int(x, 2) ^ int('1' * len(x), 2)).replace('0b', '').zfill(len(x))
# Changes one bit in a genome with 25% probability
def mutate(x):
rnd = random.randrange(len(x))
if (random.randrange(4) == 3): return x[:rnd] + str(int(x[rnd])^1) + x[rnd+1:]
else: return x
# Creates two offspring of two parent binary genomes with a possible mutation
# Uses a random mask (determines which parts of the genome will be inherited from which parent)
def crossover(x, y):
if (len(x) != len(y)): return None
m = ''.join(random.choices(['0', '1'], k=len(x)))
return [
mutate(b_or(b_and(x, m), b_and(y, b_not(m)))),
mutate(b_or(b_and(y, m), b_and(x, b_not(m))))
]
# Similar to the previous function, but creates four offpsing using two different masks
def crossover4(x, y):
if (len(x) != len(y)): return None
m = ''.join(random.choices(['0', '1'], k=len(x)))
m1 = ''.join(random.choices(['0', '1'], k=len(x)))
return [
mutate(b_or(b_and(x, m), b_and(y, b_not(m)))),
mutate(b_or(b_and(y, m), b_and(x, b_not(m)))),
mutate(b_or(b_and(x, m1), b_and(y, b_not(m1)))),
mutate(b_or(b_and(y, m1), b_and(x, b_not(m1))))
]
# Decimal to binary conversion
def dec2bin(decimal, places):
return bin(decimal).replace('0b', '').zfill(places)
# Binary to decimal conversion
def bin2dec(binary):
return int(binary, 2)
# Fitness function is defined here as the accuracy of a trained model for a selected setting
# genome: a binary string, which represents an initial configuration
# lenghts: a list defining the division of the genome into parts which determine each setting
# function: a machine learning algorithm
# extended: determines the output format (normal or extended)
def fitness(genome, lenghts, function, extended):
if (sum(lenghts) != len(genome)): return None
g_split = []
for l in lenghts:
g_split.append(genome[:l])
genome = genome[l:]
list_decimal = list(map(bin2dec, g_split))
if (extended): return [function(list_decimal), list_decimal]
return function(list_decimal)
# Generates individuals for an initial population
# init_size: number of generated individuals
# lengths: a list defining the genome division (sum of the elements equals the total genome length)
def generatePopulation(init_size, lengths):
population = []
for i in range(init_size):
rand = ''
for j in range(sum(lengths)): rand += str(random.randrange(2))
population.append(rand)
return population
# Removes uncorrelated attributes from a given dataset
# data_source: a dataset for calculating correlation values
# data_target: a dataset from which the uncorrelated values will be discarded
# threshold: the lowest absolute value of the correlation coefficient for keeping an attribute in the dataset
def removeUncorrelated(data_source, data_target, threshold):
to_remove = []
for i in range(0, len(data_source.transpose())):
corr = np.corrcoef(data_source.transpose()[i], ded)
if (abs(corr[0, 1]) < threshold): to_remove.append(i)
return np.delete(data_target, to_remove, 1)
# Similar to removeUncorrelated, but uses precomputed lists with correlation coefficients
# class_corr: a precomputed list with the correlation coefficients (CORR_*)
# data_target: a dataset from which the uncorrelated values will be discarded
# threshold: the lowest absolute value of the correlation coefficient for keeping an attribute in the dataset
def removeUncorrelated2(class_corr, data_target, threshold):
to_remove = []
for i in range(0, len(class_corr)):
if (abs(class_corr[i][1]) < threshold): to_remove.append(i)
return np.delete(data_target, to_remove, 1)
The following functions are called inside the fitness function and use machine learning algorithms from scikit-learn and Keras. The options for setting ML algorithms and their description will not be presented here. It can be read in the official documentation.
All of the functions use a list that specifies the initial configuration including a minimum level of correlation to keep an attribute in the dataset. Further arguments contain class values (for training and test data) and the last one is a precomputed list with the correlation coefficients.
Besides the initial setting for a ML algorithm, the arguments have default values. This simplifies function calls, but the defaults have to be changed here if necessary.
The implemented functions return accuracy, which is further used for GEA purposes. In addition to this metric, the work also uses extended accuracy (its calculation is prepared and commented in each function). This concept is presented in chapters 3.3.2 and 4.5 on pages 25 and 53.
The following code uses a simple decision tree algorithm.
# params[0]: random state
# params[1]: min_samples_leaf
# params[2]: max_depth
# params[3]: minimal correlation level for an attribute to retain / 1000
def tree_score(
params, # an initial configuration
class_var_trn = ded_trn, # class values for the train dataset
class_var_tst = ded_tst, # class values for the test dataset
class_corr = CORR_DED # the precomputed list with the correlation coefficients
):
if (params[1] == 0 or params[2] == 0): return 0
train = removeUncorrelated2(class_corr, data_trn, params[3] / 1000)
test = removeUncorrelated2(class_corr, data_tst, params[3] / 1000)
if (len(train.transpose()) == 0): return 0
tree = DecisionTreeClassifier(random_state=params[0], min_samples_leaf=params[1], max_depth=params[2])
tree.fit(train, classValues(class_var_trn))
cm = confusion_matrix(classValues(class_var_tst), tree.predict(test), labels=['low', 'mid', 'high'])
accuracy = (cm[0][0] + cm[1][1] + cm[2][2]) / len(test)
# ext_accuracy = (cm.sum() - cm[0][2] - cm[2][0]) / len(test)
# ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['low', 'mid', 'high']).plot()
return accuracy
This function uses a random forest classifier.
# params[0]: random state
# params[1]: n_estimators
# params[2]: minimal correlation level for an attribute to retain / 1000
def rfc_score(
params, # an initial configuration
class_var_trn = ded_trn, # class values for the train dataset
class_var_tst = ded_tst, # class values for the test dataset
class_corr = CORR_DED # the precomputed list with the correlation coefficients
):
if (params[1] == 0): return 0
train = removeUncorrelated2(class_corr, data_trn, params[2] / 1000)
test = removeUncorrelated2(class_corr, data_tst, params[2] / 1000)
if (len(train.transpose()) == 0): return 0
rfc = RandomForestClassifier(random_state=params[0], n_estimators=params[1])
rfc.fit(train, classValues(class_var_trn))
cm = confusion_matrix(classValues(class_var_tst), rfc.predict(test), labels=['low', 'mid', 'high'])
accuracy = (cm[0][0] + cm[1][1] + cm[2][2]) / len(test)
# ext_accuracy = (cm.sum() - cm[0][2] - cm[2][0]) / len(test)
# ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['low', 'mid', 'high']).plot()
return accuracy
The following function uses Gaussian Naive Bayes.
# params[0]: minimal correlation level for an attribute to retain / 1000
def gnb_score(
params, # an initial configuration
class_var_trn = ded_trn, # class values for the train dataset
class_var_tst = ded_tst, # class values for the test dataset
class_corr = CORR_DED # the precomputed list with the correlation coefficients
):
train = removeUncorrelated2(class_corr, data_trn, params[0] / 1000)
test = removeUncorrelated2(class_corr, data_tst, params[0] / 1000)
if (len(train.transpose()) == 0): return 0
gnb = GaussianNB()
gnb.fit(train, classValues(class_var_trn))
cm = confusion_matrix(classValues(class_var_tst), gnb.predict(test), labels=['low', 'mid', 'high'])
accuracy = (cm[0][0] + cm[1][1] + cm[2][2]) / len(test)
# ext_accuracy = (cm.sum() - cm[0][2] - cm[2][0]) / len(test)
# ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['low', 'mid', 'high']).plot()
return accuracy
C-Support Vector Classification is applied here.
# params[0]: minimal correlation level for an attribute to retain / 1000
# params[1]: kernel
# params[2]: degree (for poly)
# params[3]: gamma / 10 (kernel coefficient for poly)
# params[4]: C-value / 10
def svc_score(
params, # an initial configuration
class_var_trn = ded_trn, # class values for the train dataset
class_var_tst = ded_tst, # class values for the test dataset
class_corr = CORR_DED # the precomputed list with the correlation coefficients
):
if (params[2] == 0): return 0
if (params[3] == 0): return 0
if (params[4] == 0): return 0
train = removeUncorrelated2(class_corr, data_trn, params[0] / 1000)
test = removeUncorrelated2(class_corr, data_tst, params[0] / 1000)
if (len(train.transpose()) == 0): return 0
if (params[1] == 0): k = 'linear'
if (params[1] == 1): k = 'poly'
svc = svm.SVC(kernel=k, degree=params[2], gamma=params[3]/10, C=params[4]/10)
svc.fit(train, classValues(class_var_trn))
cm = confusion_matrix(classValues(class_var_tst), svc.predict(test), labels=['low', 'mid', 'high'])
accuracy = (cm[0][0] + cm[1][1] + cm[2][2]) / len(test)
# ext_accuracy = (cm.sum() - cm[0][2] - cm[2][0]) / len(test)
# ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['low', 'mid', 'high']).plot()
return accuracy
The most complex classifier used is a neural network with three layers.
# params[0]: minimal correlation level for an attribute to retain / 1000
# params[1]: number of neurons in the first layer
# params[2]: number of neurons in the second layer
# params[3]: learning_rate / 1000
# params[4]: number of epochs
# params[5]: batch_size
def neural_score(
params, # an initial configuration
class_var_trn = ded_trn, # class values for the train dataset
class_var_tst = ded_tst, # class values for the test dataset
class_corr = CORR_DED # the precomputed list with the correlation coefficients
):
if (params[1] < 8): return 0
if (params[2] < 8): return 0
if (params[3] < 8): return 0
if (params[4] < 50): return 0
if (params[5] < 30): return 0
train = removeUncorrelated2(class_corr, data_trn, params[0] / 1000)
test = removeUncorrelated2(class_corr, data_tst, params[0] / 1000)
if (len(train.transpose()) == 0): return 0
model = Sequential([
Dense(units=params[1], input_shape=(len(train.transpose()),), activation='relu'),
Dense(units=params[2], activation='relu'),
Dense(units=3, activation='softmax')
])
model.compile(
optimizer=Adam(learning_rate=params[3]/1000),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
model.fit(
x=minmax(train),
y=classValues(class_var_trn, numbers=True),
batch_size=params[5],
epochs=params[4],
verbose=0
)
predictions = model.predict(minmax(test))
rounded_predictions = np.argmax(predictions, axis=-1)
cm = confusion_matrix(classValues(class_var_tst, True), rounded_predictions, labels=['low', 'mid', 'high'])
accuracy = (cm[0][0] + cm[1][1] + cm[2][2]) / len(test)
# ext_accuracy = (cm.sum() - cm[0][2] - cm[2][0]) / len(test)
# ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['low', 'mid', 'high']).plot()
return accuracy
The genetic algorithm first generates an initial population and calculates the fitness function value for each individual. The individuals are sorted accordingly.
A survival probability is assigned to each individual. Rank selection technique is used here and a random number of individuals (but more than half of the population) continue to the crossover process. The offspring population creates a new generation and the whole process repeats.
The algorithm prints the best individual (best setting found) from each population. At the end of the run, it returns the best one of all populations.
GEA works with initially defined parameters. These are LENGTHS, FUNCTION, INIT_PPL_SIZE and MAX_GENERATIONS and their meaning is described in the code below.
# LENGTHS = [9]
# FUNCTION = gnb_score
# INIT_PPL_SIZE = 10
# MAX_GENERATIONS = 23
# LENGTHS = [9, 10, 8]
# FUNCTION = rfc_score
# INIT_PPL_SIZE = 10
# MAX_GENERATIONS = 13
# LENGTHS = [7, 7, 8, 5, 8, 8]
# FUNCTION = neural_score
# INIT_PPL_SIZE = 20
# MAX_GENERATIONS = 8
LENGTHS = [9, 5, 7, 8] # number of bits for each ML algorithm parameter (e.g. 9 = 2^9 setting options)
FUNCTION = tree_score # previously defined score function, which uses a ML algorithm
INIT_PPL_SIZE = 20 # number of individuals in the initial population
MAX_GENERATIONS = 18 # maximum number of generations
total_best = [0]
for generation in range(MAX_GENERATIONS):
# generate a random population for the first generation
if (generation == 0): new_population = generatePopulation(INIT_PPL_SIZE, LENGTHS)
# get the fittness value for each individual
population = []
for i in new_population: population.append([i, fitness(i, LENGTHS, FUNCTION, extended=False), 0])
# sort by the fittness value
population.sort(key=lambda x: x[1])
# assign probability to be chosen to every individual
# rank selection is a suitable choice due to the similar fitness values
prob_base = (len(population) * (len(population) + 1)) / 2
for i in range(len(population)):
population[i][2] = (i + 1) / prob_base
# print the best individual and update total_best (= the best individual of all generations)
best = fitness(population[-1][0], LENGTHS, FUNCTION, extended=True)
if (best[0] > total_best[0]): total_best = best
print(generation, 'best', population[-1][0], best)
# get the randomly choiced individuals (without replacement) for crossover
selected = np.random.choice(
np.array(population)[:,0], # get the genomes only
size = random.randint(int(len(population) / 4), int(len(population) / 2)) * 2,
# size is larger than half and at most equal to the population size
replace = False,
p = np.array(population)[:,2].astype('float64') # get the weights from the population array
)
# crossover individuals to get offspring population (0th with 1st, 2nd with 3rd etc.)
children = []
ctr = 0
while (ctr < len(selected)-1):
children.append(crossover4(selected[ctr], selected[ctr + 1])) # zkusit crossover4
ctr += 2
new_population = np.array(children).flatten()
if (len(new_population) == 2): break # check whether the population is large enough
print('new population size', len(new_population))
print('-------------------------------')
print('all generation best', total_best)
A function that modifies the dataset so that all values of its parameters lie between 0 and 1. Such modified data is suitable for training various ML algorithms.
This function is not called anywhere in the code above, but has been tested and is listed here for completeness.
def minmax(dataset):
dataset = np.array(dataset).transpose()
res = []
for i in dataset:
res.append(MinMaxScaler(feature_range=(0,1)).fit_transform(i.reshape(-1,1)).flatten())
return np.array(res).transpose()