Jared AI Hub
Published on

Understanding Xgboost Model from Foundations

Authors
  • avatar
    Name
    Jared Chung
    Twitter

Introduction

Welcome to the world of XGBoost! If you're a data scientist or machine learning enthusiast, you've probably heard of this powerful tool for boosting tree models.

But what exactly is XGBoost, and how does it work? In this blog, we'll delve into the inner workings of XGBoost, and even create our own version from scratch using Numpy.

Understanding the Algorithm

Flow-chart-of-XGBoost

XGBoost stands for "eXtreme Gradient Boosting," and it's a boosting algorithm that combines the predictions of multiple weak learners to create a strong, accurate model. Boosting algorithms work by building a series of weak models and adding them together to form a strong one. In the case of XGBoost, these weak models are decision trees.

So how does XGBoost differ from other boosting algorithms? One key difference is that it uses gradient descent to optimize the weak models. This means that it can handle missing values and large amounts of data more efficiently than other boosting algorithms. XGBoost also has a number of additional features, such as regularization and early stopping, which can improve model performance and prevent over-fitting.

Now rather than importing the actual XGBoost package I was interested in looking at simpler breakdown of the code to understanding the inner workings, fortunately there is convenient code we can look at XGBoost-From-Scratch which recreates the model in Numpy and Pandas making it easy to breakdown

Coding the XGBoost

Setup

First off lets import the necessary packages, as well as create some helper functions to assist us in evaluation and splitting the data.

import numpy as np
import pandas as pd
from sklearn import datasets
from math import e

def train_test_split(X, y, test_size=0.5, shuffle=True, seed=None):
    """ Split the data into train and test sets """
    if shuffle:
        X, y = shuffle_data(X, y, seed)
    # Split the training data from test data in the ratio specified in
    # test_size
    split_i = len(y) - int(len(y) // (1 / test_size))
    X_train, X_test = X[:split_i], X[split_i:]
    y_train, y_test = y[:split_i], y[split_i:]

    return X_train, X_test, y_train, y_test

def accuracy_score(y_true, y_pred):
    """ Compare y_true to y_pred and return the accuracy """
    accuracy = np.sum(y_true == y_pred, axis=0) / len(y_true)
    return accuracy

Weak Learner Tree

The building block of the XGBoost model is the creation of the underlying Tree as previously mentioned is based on smaller weak learners. A decision tree based model is used most of the time because of its flexibility with a variety of different data types. It also has a tendency of over-fitting which is something that can be exploiting in boosting methods.

We need to create Node class which will be equivalent of a tree.

The main methods of the class include:

  • __init__ which details how each tree will be created and the parameters used such as subsample_cols, min_laf, min_child_weight, depth, lambda, gamma and eps.
  • compute_gamma which calculates the optimal leaf value equation which will be used in determine splits
  • find_varsplit this loops through each column to find which one has the best split
  • find_greedy_split which is a method using the find_varsplit function to determine the best split
  • gain which is using the compute_gamma method to get the gain for each of the split which is used in the find_varsplit method

class Node:
    
    '''
    A single tree object that will be used for gradient boosting.
    '''

    def __init__(self, x, gradient, hessian, idxs, subsample_cols = 0.8 , min_leaf = 5, min_child_weight = 1 ,depth = 10, lambda_ = 1, gamma = 1, eps = 0.1):
      
        self.x, self.gradient, self.hessian = x, gradient, hessian
        self.idxs = idxs 
        self.depth = depth
        self.min_leaf = min_leaf
        self.lambda_ = lambda_
        self.gamma  = gamma
        self.min_child_weight = min_child_weight
        self.row_count = len(idxs)
        self.col_count = x.shape[1]
        self.subsample_cols = subsample_cols
        self.eps = eps
        self.column_subsample = np.random.permutation(self.col_count)[:round(self.subsample_cols*self.col_count)]
        
        self.val = self.compute_gamma(self.gradient[self.idxs], self.hessian[self.idxs])
          
        self.score = float('-inf')
        self.find_varsplit()
        
        
    def compute_gamma(self, gradient, hessian):
        '''
        Calculates the optimal leaf value equation (5) in "XGBoost: A Scalable Tree Boosting System"
        '''
        return(-np.sum(gradient) / (np.sum(hessian) + self.lambda_))
        
    def find_varsplit(self):
        '''
        Scans through every column and calculates the best split point.
        The node is then split at this point and two new nodes are created.
        Depth is only parameter to change as we have added a new layer to tre structure.
        If no split is better than the score initialised at the beginning then no splits further splits are ~~made~~
        '''
        for c in self.column_subsample: 
            self.find_greedy_split(c)
        if self.is_leaf: 
            return
        x = self.split_col
        lhs = np.nonzero(x <= self.split)[0]
        rhs = np.nonzero(x > self.split)[0]
        self.lhs = Node(x = self.x, gradient = self.gradient, hessian = self.hessian, idxs = self.idxs[lhs], min_leaf = self.min_leaf, depth = self.depth-1, lambda_ = self.lambda_ , gamma = self.gamma, min_child_weight = self.min_child_weight, eps = self.eps, subsample_cols = self.subsample_cols)
        self.rhs = Node(x = self.x, gradient = self.gradient, hessian = self.hessian, idxs = self.idxs[rhs], min_leaf = self.min_leaf, depth = self.depth-1, lambda_ = self.lambda_ , gamma = self.gamma, min_child_weight = self.min_child_weight, eps = self.eps, subsample_cols = self.subsample_cols)
        
    def find_greedy_split(self, var_idx):
        '''
         For a given feature greedily calculates the gain at each split.
         Globally updates the best score and split point if a better split point is found
        '''
        x = self.x[self.idxs, var_idx]
        
        for r in range(self.row_count):
            lhs = x <= x[r]
            rhs = x > x[r]
            
            lhs_indices = np.nonzero(x <= x[r])[0]
            rhs_indices = np.nonzero(x > x[r])[0]
            if(rhs.sum() < self.min_leaf or lhs.sum() < self.min_leaf 
               or self.hessian[lhs_indices].sum() < self.min_child_weight
               or self.hessian[rhs_indices].sum() < self.min_child_weight): continue

            curr_score = self.gain(lhs, rhs)
            if curr_score > self.score: 
                self.var_idx = var_idx
                self.score = curr_score
                self.split = x[r]
                                
    def gain(self, lhs, rhs):
        '''
        Calculates the gain at a particular split point bases on equation (7) from
        "XGBoost: A Scalable Tree Boosting System"
        '''
        gradient = self.gradient[self.idxs]
        hessian  = self.hessian[self.idxs]
        
        lhs_gradient = gradient[lhs].sum()
        lhs_hessian  = hessian[lhs].sum()
        
        rhs_gradient = gradient[rhs].sum()
        rhs_hessian  = hessian[rhs].sum()
        
        gain = 0.5 *( (lhs_gradient**2/(lhs_hessian + self.lambda_)) + (rhs_gradient**2/(rhs_hessian + self.lambda_)) - ((lhs_gradient + rhs_gradient)**2/(lhs_hessian + rhs_hessian + self.lambda_))) - self.gamma
        return(gain)
                
    @property
    def split_col(self):
        '''
        splits a column 
        '''
        return self.x[self.idxs , self.var_idx]
                
    @property
    def is_leaf(self):
        '''
        checks if node is a leaf
        '''
        return self.score == float('-inf') or self.depth <= 0                 

    def predict(self, x):
        return np.array([self.predict_row(xi) for xi in x])
    
    def predict_row(self, xi):
        if self.is_leaf:
            return(self.val)

        node = self.lhs if xi[self.var_idx] <= self.split else self.rhs
        return node.predict_row(xi)



The XGBoost class

Next, we'll define our XGBoost model class. We'll start by defining the creating the XGBoostTree which will wrap the Node Class created before.

class XGBoostTree:
    '''
    A single tree object that will be used for gradient boosting.
    '''
    def fit(self, x, gradient, hessian, subsample_cols = 0.8 , min_leaf = 5, min_child_weight = 1 ,depth = 10, lambda_ = 1, gamma = 1, eps = 0.1):
        self.dtree = Node(x, gradient, hessian, np.array(np.arange(len(x))), subsample_cols, min_leaf, min_child_weight, depth, lambda_, gamma, eps)
        return self
    
    def predict(self, X):
        return self.dtree.predict(X)


We then create the main XGBoostClassifier which takes in the learning rate, number of trees, and maximum depth of each tree as parameters.

Now we'll define the fit function, which will train our model on the input data. We'll start by initializing the decision tree model and adding it to our list of weak learners. Then, we'll iterate through the number of trees specified in the initialization function and add each one to the list.

Finally, we'll define the predict function, which will use our trained model to make predictions on new data. We'll simply iterate through each of the weak learners and add their predictions together, multiplied by the learning rate.

class XGBoostClassifier:
    '''
    A single tree object that will be used for gradient boosting.
    '''
    def __init__(self):
        self.estimators = []
    
    @staticmethod
    def sigmoid(x):
        return 1 / (1 + np.exp(-x))
    
    # first order gradient logLoss
    def grad(self, preds, labels):
        preds = self.sigmoid(preds)
        return(preds - labels)
    
    # second order gradient logLoss
    def hess(self, preds, labels):
        preds = self.sigmoid(preds)
        return(preds * (1 - preds))
    
    @staticmethod
    def log_odds(column):
        binary_yes = np.count_nonzero(column == 1)
        binary_no  = np.count_nonzero(column == 0)
        return(np.log(binary_yes/binary_no))
    
    
    def fit(self, X, y, subsample_cols = 0.8 , min_child_weight = 1, depth = 5, min_leaf = 5, learning_rate = 0.4, boosting_rounds = 5, lambda_ = 1.5, gamma = 1, eps = 0.1):
        self.X, self.y = X, y
        self.depth = depth
        self.subsample_cols = subsample_cols
        self.eps = eps
        self.min_child_weight = min_child_weight 
        self.min_leaf = min_leaf
        self.learning_rate = learning_rate
        self.boosting_rounds = boosting_rounds 
        self.lambda_ = lambda_
        self.gamma  = gamma
    
        self.base_pred = np.full((X.shape[0], 1), 1).flatten().astype('float64')
    
        for booster in range(self.boosting_rounds):
            Grad = self.grad(self.base_pred, self.y)
            Hess = self.hess(self.base_pred, self.y)
            boosting_tree = XGBoostTree().fit(self.X, Grad, Hess, depth = self.depth, min_leaf = self.min_leaf, lambda_ = self.lambda_, gamma = self.gamma, eps = self.eps, min_child_weight = self.min_child_weight, subsample_cols = self.subsample_cols)
            self.base_pred += self.learning_rate * boosting_tree.predict(self.X)
            self.estimators.append(boosting_tree)
          
    def predict_proba(self, X):
        pred = np.zeros(X.shape[0])
        
        for estimator in self.estimators:
            pred += self.learning_rate * estimator.predict(X) 
          
        return(self.sigmoid(np.full((X.shape[0], 1), 1).flatten().astype('float64') + pred))
    
    def predict(self, X):
        pred = np.zeros(X.shape[0])
        for estimator in self.estimators:
            pred += self.learning_rate * estimator.predict(X) 
        
        predicted_probas = self.sigmoid(np.full((X.shape[0], 1), 1).flatten().astype('float64') + pred)
        preds = np.where(predicted_probas > np.mean(predicted_probas), 1, 0)
        return(preds)
       

Running the Model

Now that we have created the the XGBoost model we can now test it loading the common iris dataset which is predict 3 types of plants (Iris setosa, Iris virginica and Iris versicolor) based their characteristics.


data = datasets.load_iris()
X = data.data
y = data.target

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.4, seed=2)  

clf = XGBoostClassifier()

clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)

accuracy = accuracy_score(y_test, y_pred)

print ("Accuracy:", accuracy)

Conclusion

And that's it! We've created a simple version of XGBoost from scratch using Numpy. Of course, the real implementation of XGBoost is more complicated than this, in addition, there are now other top competitor implementations such as lightgbm and catboost. This provides a good starting point to understanding what's happening beneath the hood.

References