Source code for cvopt.model_selection._base

import os, sys, copy, time
import pandas as pd, numpy as np
from datetime import datetime
from abc import ABCMeta, abstractmethod

from sklearn.utils.metaestimators import if_delegate_has_method
from sklearn.base import BaseEstimator, is_classifier
from sklearn.metrics import SCORERS, make_scorer
from sklearn.metrics.scorer import check_scoring
from sklearn.model_selection import check_cv
from sklearn.externals.joblib import Parallel, delayed
from hyperopt import STATUS_OK, STATUS_FAIL

from ..model_selection import _setting as st
from ..search_setting._base import conv_param_distributions, search_category, decode_params
from ..utils._base import chk_Xy, make_cloner, compress, make_saver, to_label
from ..utils._logger import CVSummarizer

class BaseSearcher(BaseEstimator, metaclass=ABCMeta):
    """
    Base class of cross validation optimizer.

    Class Variables
    ---------------
    score_summarizer: function(scores per cv)
        Score summarize function.
    score_summarizer_name: str
        This is used in logfile.
    feature_axis: int
        feature_select target axis. 
    """
    score_summarizer = np.mean
    score_summarizer_name = "mean"
    feature_axis = 1

    def __init__(self, estimator, param_distributions, 
                 scoring, cv, n_jobs, pre_dispatch, 
                 verbose, logdir, save_estimator, saver, model_id, cloner, refit, backend):
        # BACKLOG: Implement iid option(sklearn GridSearchCV have this option).
        
        self.estimator = estimator
        self.scoring = check_scoring(estimator, scoring=scoring)
        if hasattr(self.scoring, "_sign"):
            self.sign = self.scoring._sign
        else:
            self.sign = 1
            # In this case, scoring is None & use estimator default scorer.
            # Because scikit-learn default scorer is r2_score or accuracy, score greater is better.
            # So sign = 1.

        self.cv = cv
        self.param_distributions = copy.deepcopy(param_distributions)
        self.verbose = verbose
        self.n_jobs = n_jobs
        self.pre_dispatch = pre_dispatch
        self.logdir = logdir
        self.save_estimator = save_estimator
        self.saver = saver
        self.cloner = cloner
        self._cloner =  make_cloner(cloner)

        self.backend = backend

        if model_id is None:
            self.model_id = datetime.today().strftime("%Y%m%d_%H%M%S")
        else:
            self.model_id = model_id
        self.refit = refit


    def _preproc_fit(self, X, y, validation_data, feature_groups):
        X = chk_Xy(X, none_error=True, ravel_1d=False, msg_sjt="X")
        y = chk_Xy(y, none_error=False, ravel_1d=True, msg_sjt="y")
        if validation_data is None:
            Xvalid, yvalid = None, None
            valid = False
        else:
            Xvalid = chk_Xy(validation_data[0], none_error=False, ravel_1d=False, msg_sjt="Xvalid")
            yvalid = chk_Xy(validation_data[1], none_error=False, ravel_1d=True, msg_sjt="yvalid")
            valid = True
        cv = check_cv(self.cv, y, classifier=is_classifier(self.estimator))
        self.n_splits_ = cv.get_n_splits()

        if feature_groups is None:
            self._feature_select = False
            param_distributions = {}
        else:
            self._feature_select = True
            param_distributions = mk_feature_select_params(feature_groups, n_samples=X.shape[0], 
                                                           n_features=X.shape[BaseSearcher.feature_axis])
        param_distributions.update(self.param_distributions)
        
        self._cvs = CVSummarizer(paraname_list=param_distributions.keys(), cvsize=self.n_splits_, 
                                 score_summarizer=BaseSearcher.score_summarizer, score_summarizer_name=BaseSearcher.score_summarizer_name, 
                                 valid=valid, sign=self.sign, model_id=self.model_id, search_algo=self.search_algo, 
                                 verbose=self.verbose, save_estimator=self.save_estimator, logdir=self.logdir)
        self.cv_results_ = self._cvs()

        return X, y, Xvalid, yvalid, cv, conv_param_distributions(param_distributions, backend=self.backend)


    def _postproc_fit(self, X, y, feature_groups, best_params, best_score):
        if self._cvs.nbv is not None:
            self._cvs.nbv.close(search_algo=str(self.cv_results_["search_algo"][0]), 
                                n_iter=int(self.cv_results_["index"][-1]))
                                
        self.best_params_ = best_params
        self.best_score_ = best_score
        
        if self.refit:
            if self._feature_select:
                self.feature_select_ind, _, estimator_params = mk_feature_select_index(self.best_params_, feature_groups, verbose=1)
                self.best_estimator_ = self._cloner(self.estimator, estimator_params)
            else:
                self.feature_select_ind = np.array([True]*X.shape[BaseSearcher.feature_axis])
                self.best_estimator_ = self._cloner(self.estimator, self.best_params_)
            if y is None:
                self.best_estimator_.fit(compress(self.feature_select_ind, X, axis=BaseSearcher.feature_axis))
            else:
                self.best_estimator_.fit(compress(self.feature_select_ind, X, axis=BaseSearcher.feature_axis), y)
        if self.verbose == 1:
            sys.stdout.write("\n\rBest_score(finished):%s" %np.round(self.best_score_, 2))

    def _random_scoring(self, X, y):
        try:
            # calculate random score without estimator.
            score = []
            for i in range(10):
                score.append(self.scoring._sign*self.scoring._score_func(y, np.random.permutation(y)))
            score = np.array(score)
            score[np.isinf(score)] = np.nan
            score = np.nanmedian(score)
        except:
            try:
                # calculate random score with estimator.
                score = []
                random_estimator = self.cloner(self.estimator, params={})
                random_estimator.fit(X, y)
                for i in range(10):
                    score.append(self.scoring(random_estimator, X, np.random.permutation(y)))
                score = np.array(score)
                score[np.isinf(score)] = np.nan
                score = np.nanmedian(score)
            except:
                score = np.nan

        if np.isnan(score):
            score = self.scoring._sign*st.RANDOM_SCORE
        return score

    @if_delegate_has_method(delegate=("best_estimator_", "estimator"))
    def predict(self, X):
        """
        Call predict on the estimator with the best found parameters.

        Parameters
        ----------       
        X :numpy.array, pandas.DataFrame or scipy.sparse, shape(axis=0) = (n_samples)
            Features. Detail depends on estimator.
        """
        X = chk_Xy(X, none_error=True, ravel_1d=False, msg_sjt="X")
        return self.best_estimator_.predict(compress(self.feature_select_ind, X, axis=BaseSearcher.feature_axis))

    @if_delegate_has_method(delegate=("best_estimator_", "estimator"))
    def predict_proba(self, X):
        """
        Call predict_proba on the estimator with the best found parameters.
        
        Parameters
        ----------       
        X :numpy.array, pandas.DataFrame or scipy.sparse, shape(axis=0) = (n_samples)
            Features. Detail depends on estimator.
        """
        X = chk_Xy(X, none_error=True, ravel_1d=False, msg_sjt="X")
        return self.best_estimator_.predict_proba(compress(self.feature_select_ind, X, axis=BaseSearcher.feature_axis))

    @if_delegate_has_method(delegate=("best_estimator_", "estimator"))
    def predict_log_proba(self, X):
        """
        Call predict_log_proba on the estimator with the best found parameters.
        
        Parameters
        ----------       
        X :numpy.array, pandas.DataFrame or scipy.sparse, shape(axis=0) = (n_samples)
            Features. Detail depends on estimator.
        """
        X = chk_Xy(X, none_error=True, ravel_1d=False, msg_sjt="X")
        return self.best_estimator_.predict_log_proba(compress(self.feature_select_ind, X, axis=BaseSearcher.feature_axis))

    @if_delegate_has_method(delegate=("best_estimator_", "estimator"))
    def decision_function(self, X):
        """
        Call decision_function on the estimator with the best found parameters.
        
        Parameters
        ----------       
        X :numpy.array, pandas.DataFrame or scipy.sparse, shape(axis=0) = (n_samples)
            Features. Detail depends on estimator.
        """
        X = chk_Xy(X, none_error=True, ravel_1d=False, msg_sjt="X")
        return self.best_estimator_.decision_function(compress(self.feature_select_ind, X, axis=BaseSearcher.feature_axis))

    @if_delegate_has_method(delegate=("best_estimator_", "estimator"))
    def transform(self, X):
        """
        Call transform on the estimator with the best found parameters.
        
        Parameters
        ----------       
        X :numpy.array, pandas.DataFrame or scipy.sparse, shape(axis=0) = (n_samples)
            Features. Detail depends on estimator.
        """
        X = chk_Xy(X, none_error=True, ravel_1d=False, msg_sjt="X")
        return self.best_estimator_.transform(compress(self.feature_select_ind, X, axis=BaseSearcher.feature_axis))

    @if_delegate_has_method(delegate=("best_estimator_", "estimator"))
    def inverse_transform(self, Xt):
        """
        Call inverse_transform on the estimator with the best found parameters.
        
        Parameters
        ----------       
        Xt :numpy.array, pandas.DataFrame or scipy.sparse, shape(axis=0) = (n_samples)
            Features. Detail depends on estimator.
        """
        Xt = chk_Xy(Xt, none_error=True, ravel_1d=False, msg_sjt="X")
        return self.best_estimator_.inverse_transform(compress(self.feature_select_ind, Xt, axis=BaseSearcher.feature_axis))

    @if_delegate_has_method(delegate=("best_estimator_", "estimator"))
    def apply(self, X):
        """
        Call apply on the estimator with the best found parameters.
        
        Parameters
        ----------       
        X :numpy.array, pandas.DataFrame or scipy.sparse, shape(axis=0) = (n_samples)
            Features. Detail depends on estimator.
        """
        X = chk_Xy(X, none_error=True, ravel_1d=False, msg_sjt="X")
        return self.best_estimator_.apply(compress(self.feature_select_ind, Xt, axis=BaseSearcher.feature_axis))

    @if_delegate_has_method(delegate=("best_estimator_", "estimator"))
    def classes_(self):
        """
        Retern list of target classes.
        """
        return self.best_estimator_.classes_



def mk_feature_select_params(feature_groups, n_samples, n_features):
    """
    Set param_distributions for feature_select.
    """
    if len(feature_groups) != n_features:
        raise Exception("feature_groups length must be equal to X length(n_sample)")
    
    feature_group_names = np.unique(feature_groups)
    feature_group_names.sort()
    
    ret = {}
    for i in feature_group_names:
        tmp = st.FEATURE_SELECT_PARAMNAME_PREFIX + str(i)
        if i == st.ALWAYS_USED_FEATURE_GROUP_ID:
            ret[tmp] = search_category([True])
        elif n_samples == 1:
            ret[tmp] = search_category([True])
        else:
            ret[tmp] = search_category([True, False])
    return ret



def mk_feature_select_index(params, feature_groups, verbose):
    """
    Make feature select index.
    """
    true_group_names = []
    other_params = copy.deepcopy(params)
    feature_params = {}
    for key in params.keys():
        if st.FEATURE_SELECT_PARAMNAME_PREFIX in key:
            feature_params[key] = other_params.pop(key)
            if params[key]:
                true_group_names.append(int(key.split(st.FEATURE_SELECT_PARAMNAME_PREFIX)[-1]))

    if verbose:
        return np.in1d(feature_groups, true_group_names), feature_params, other_params
    else:
        return np.in1d(feature_groups, true_group_names)



def fit_and_score(estimator, X, y, scoring, train_ind=None, test_ind=None, test_data=None):
        """
        Run fit and compute evaluation index.

        Parameters
        ---------- 
        test_data: tuple(X, y)
            When test_data is not None, ignore test_ind and use test_data in compute score_test.
        """
        if train_ind is None:
            train_ind = np.arange(X.shape[0])

        start = time.time()
        estimator.fit(X[train_ind], y[train_ind])
        fittime = time.time() - start

        start = time.time()
        score_train = scoring(estimator, X[train_ind], y[train_ind])
        scoretime = time.time() - start
        
        if test_data is None:
            score_test = scoring(estimator, X[test_ind], y[test_ind])
        else:
            score_test = scoring(estimator, *test_data)

        return score_train, score_test, fittime, scoretime, estimator



def _obj_return(score, succeed, backend):
    """
    Differences in the type of score (greater is better or not) are absorbed by sklearn like scoring function.
    So, input for the minimization function is -1 * score function output.

    Example
    ---------- 
    greater_is_better=True: score function output = 1*score

    greater_is_better=False: score function output = -1*score
    """

    if backend == "hyperopt":
        if succeed:
            return {"loss":-1.0*score, "status":STATUS_OK}
        else:
            return {"loss":score, "status":STATUS_FAIL}
    
    elif (backend == "bayesopt") or (backend == "gaopt"):
        return -1.0*score



def mk_objfunc(X, y, groups, feature_groups, feature_axis, estimator, scoring, cv, 
               param_distributions, backend, failedscore, saver, cloner, 
               score_summarizer=np.mean, 
               Xvalid=None, yvalid=None, n_jobs=1, pre_dispatch="2*n_jobs", 
               cvsummarizer=None, save_estimator=0, min_n_features=2):
    """
    Function to make search objective function(input:params, output:evaluation index)
    """
    n_splits_ = cv.get_n_splits()
    cvs = cvsummarizer
    saver = make_saver(saver)

    def obj(params):
        start_time = datetime.now()
        params = decode_params(params=params, param_distributions=param_distributions, backend=backend)

        cvs.display_status(params=params, start_time=start_time)

        if feature_groups is None:
            feature_select = False
            feature_select_ind = np.array([True]*X.shape[feature_axis])
            estimator_params = copy.deepcopy(params)
        else:
            feature_select = True
            feature_select_ind, _, estimator_params = mk_feature_select_index(params, feature_groups, verbose=1)
            if feature_select_ind.sum() < min_n_features:
                score = np.nan
                if cvs is not None:
                    end_time = datetime.now()
                    cvs.store_cv_result(cv_train_scores=[np.nan]*n_splits_, cv_test_scores=[np.nan]*n_splits_, params=params, 
                                        fit_times=[np.nan]*n_splits_, score_times=[np.nan]*n_splits_, feature_select=feature_select,
                                        X_shape=compress(feature_select_ind, X, axis=feature_axis).shape, 
                                        start_time=start_time, end_time=end_time, 
                                        train_score=np.nan, validation_score=np.nan)
                return _obj_return(score=failedscore, succeed=False, backend=backend)

        # cross validation
        cv_train_scores = []
        cv_test_scores = []
        fit_times = []
        score_times = []

        ret_p = Parallel(
            n_jobs=n_jobs, pre_dispatch=pre_dispatch, 
        )(delayed(fit_and_score)(estimator=cloner(estimator, estimator_params), 
                                 X=compress(feature_select_ind, X, axis=feature_axis),  
                                 y=y, 
                                 train_ind=train_ind, test_ind=test_ind, 
                                 scoring=scoring)
        for train_ind, test_ind in cv.split(compress(feature_select_ind, X, axis=feature_axis), to_label(y), groups))

        # evaluate validation data
        if Xvalid is None:
            train_score, validation_score = np.nan, np.nan
        else:
            train_score, validation_score, _, _, estimator_test = fit_and_score(
                estimator=cloner(estimator, estimator_params), 
                X=compress(feature_select_ind, X, axis=feature_axis),  
                y=y, 
                test_data=(compress(feature_select_ind, Xvalid, axis=feature_axis), yvalid), 
                scoring=scoring)

        # summarize
        if (cvs.logdir is not None) & (save_estimator > 0):
            estimators_cv = []
            path = os.path.join(cvs.logdir, "estimators", cvs.model_id)
            name_prefix = cvs.model_id + "_index" + "{0:05d}".format(len(cvs.cv_results_["params"]))
            for cnt, (i, j, k, l, m) in enumerate(ret_p):
                cv_train_scores.append(i)
                cv_test_scores.append(j)
                fit_times.append(k)
                score_times.append(l)

                saver(m, os.path.join(path, name_prefix+"_split"+"{0:02d}".format(cnt)))

            if (save_estimator > 1):
                if Xvalid is None:
                    estimator_test = cloner(estimator, estimator_params)
                    estimator_test.fit(compress(feature_select_ind, X, axis=feature_axis), y)
                saver(estimator_test, os.path.join(path, name_prefix+"_test"))
        else:
            for i, j, k, l, m in ret_p:
                cv_train_scores.append(i)
                cv_test_scores.append(j)
                fit_times.append(k)
                score_times.append(l)

        if cvs is not None:
            end_time = datetime.now()
            cvs.store_cv_result(cv_train_scores=cv_train_scores, cv_test_scores=cv_test_scores, params=params, 
                                fit_times=fit_times, score_times=score_times, feature_select=feature_select,
                                X_shape=compress(feature_select_ind, X, axis=feature_axis).shape,
                                start_time=start_time, end_time=end_time, 
                                train_score=train_score, validation_score=validation_score)
        score = score_summarizer(cv_test_scores)
        return _obj_return(score=score, succeed=True, backend=backend)
    return obj