import public
import bisect
import numpy
import matplotlib.pylab as plt
from functools import reduce
[docs]@public.add
def plot_err(scores, utility_mean, utility_err, color=None, label=None, alpha=0.5):
    plt.plot(scores, utility_mean, color=color)
    plt.fill_between(scores, *utility_err, alpha=alpha, color=color, label=label) 
[docs]@public.add
def expected_utility(utility, data, N=4096, credibility=0.5):
    """Get the utility distribution over possible thresholds.
    Args:
        utility (function): utility function that ingests true/false
            positive/negative rates.
        data (list-like): iterable of list-likes of the form (ground truth,
            score). Feedback is null when an alert is not triggered.
        credibility (float): Credibility level for a credible interval. This
            interval will be centered about the mean and have a `credibility`
            chance of containing the true utility.
    returns:
        tuple of three elements:
        - candidate thresholds
        - mean expected utility
        - upper and lower quantile of estimate of expected utility associated
          with each threshold
    """
    credibility /= 2
    scores, utilities = sample_utilities(utility, data, N=N)
    low = int(N * credibility)
    high = int(N * (1 - credibility))
    utilities = numpy.asarray(utilities)
    utilities.sort(axis=1)
    return scores, utilities.mean(1), numpy.asarray(utilities[:, [low, high]]).T 
[docs]@public.add
def optimal_threshold(utility, data, N=4096):
    scores, utilities = sample_utilities(utility, data, N=N)
    means = utilities.mean(1)
    idx = means.argmax()
    return scores[idx], means[idx] 
[docs]@public.add
def sample_utilities(utility, data, N=4096):
    """Get distribution of utilities.
    Args:
        utility (float): utility function that ingests true/false
            positive/negative rates.
        data (list-like): iterable of of iterables of the form (ground truth, score).
            Feedback is null when an alert is not triggered.
    returns: thresholds, utilities
    """
    if not len(data):
        return data, numpy.asarray([])
    nprng = numpy.random.RandomState(0)
    data = numpy.asarray(data)
    num_positives = data[:, 0].sum()
    rates = [1 + num_positives, 1 + len(data) - num_positives, 1, 1]
    utilities = []
    data = data[numpy.argsort(data[:, 1])]
    for ground_truth, score in data:
        update_rates(rates, ground_truth)
        utilities.append(utility(*nprng.dirichlet(rates, size=N).T))
    return data[:, 1], numpy.asarray(utilities) 
[docs]@public.add
def thompson_sample(utility, data, N=1024, quantile=False):
    scores, utilities = sample_utilities(utility, data, N)
    if quantile:
        return utilities.argmax(axis=0) / (len(utilities) - 1)
    return scores[utilities.argmax(axis=0)] 
[docs]@public.add
def update_rates(rates, ground_truth):
    rates[0] -= ground_truth
    rates[1] -= not ground_truth
    rates[2] += not ground_truth
    rates[3] += ground_truth 
[docs]@public.add
class AdaptiveThreshold:
    """Adaptive agent that balances exploration with exploitation with respect
    to setting and adjusting thresholds.
    When exploring, the threshold is 0, effectively letting anything
    through. This produces unbiased data that can then be used to set a
    more optimal threshold in subsequent rounds. The agent seeks to
    balance the opportunity cost of running an experiment with the
    utility gained over subsequent rounds using the information gained
    from this experiment.
    """
[docs]    def __init__(self, utility):
        """
        Args:
            utility (function): Function that takes in true/false
                positive/negative rates. Specifically (tp, fp, tn fn) -> float
                representing utility."""
        self.utility = utility
        self.results = []
        self.unbiased_positives = 1
        self.unbiased_negatives = 1
        self.previous_threshold = 0
        self.nprng = numpy.random.RandomState(0) 
[docs]    def get_best_threshold(self):
        # true positives, false positives, true negatives, false negatives
        rates = [self.unbiased_positives, self.unbiased_negatives, 1, 1]
        experiment_utility = self.utility(*self.nprng.dirichlet(rates))
        hypothetical_rates = [
            self.unbiased_positives - self.last_experiment_outcome,
            self.unbiased_negatives - (1 - self.last_experiment_outcome),
            1,
            1,
        ]
        best_hypothetical_utility = -numpy.inf
        best_utility = -numpy.inf
        for score, ground_truth, idx in self.results:
            update_rates(rates, ground_truth)
            utility = self.utility(*self.nprng.dirichlet(rates))
            if utility > best_utility:
                best_utility = utility
                best_threshold = score
            if idx >= self.last_experiment_idx:
                continue
            update_rates(hypothetical_rates, ground_truth)
            hypothetical_utility = self.utility(
                *self.nprng.dirichlet(hypothetical_rates)
            )
            if hypothetical_utility > best_hypothetical_utility:
                best_hypothetical_utility = hypothetical_utility
                hindsight_utility = utility
        return best_threshold, experiment_utility, best_utility, hindsight_utility 
[docs]    def __call__(self, ground_truth, score):
        """Args are ignored if previous threshold was not 0. Otherwise, the
        score is added as a potential threhold and ground_truth noted to help
        identify the optimal threshold.
        Args:
            ground_truth (bool)
            score (float)
        """
        idx = len(self.results)
        if self.previous_threshold == 0:
            bisect.insort(self.results, (score, ground_truth, idx))
            self.unbiased_positives += ground_truth
            self.unbiased_negatives += 1 - ground_truth
            self.last_experiment_idx = idx
            self.last_experiment_outcome = ground_truth
        if len(self.results) < 2:
            return self.previous_threshold
        (
            best_threshold,
            experiment_utility,
            best_utility,
            hindsight_utility,
        ) = self.get_best_threshold()
        total_utility_gained = (best_utility - hindsight_utility) * (
            idx - self.last_experiment_idx
        )
        opportunity_cost = hindsight_utility - experiment_utility
        if opportunity_cost <= total_utility_gained:
            self.previous_threshold = 0
        else:
            self.previous_threshold = best_threshold
        return self.previous_threshold  
[docs]@public.add
def exploration_proportion(thresholds, N):
    exploration = thresholds == 0
    alpha = 1 - 1.0 / N
    return reduce(
        lambda accum, elem: accum + [accum[-1] * alpha + elem * (1 - alpha)],
        exploration[N:],
        [exploration[:N].mean()],
    )