import public
import bisect
import numpy
import matplotlib.pylab as plt
from functools import reduce
[docs]@public.add
def plot_err(scores, utility_mean, utility_err, color=None, label=None, alpha=0.5):
plt.plot(scores, utility_mean, color=color)
plt.fill_between(scores, *utility_err, alpha=alpha, color=color, label=label)
[docs]@public.add
def expected_utility(utility, data, N=4096, credibility=0.5):
"""Get the utility distribution over possible thresholds.
Args:
utility (function): utility function that ingests true/false
positive/negative rates.
data (list-like): iterable of list-likes of the form (ground truth,
score). Feedback is null when an alert is not triggered.
credibility (float): Credibility level for a credible interval. This
interval will be centered about the mean and have a `credibility`
chance of containing the true utility.
returns:
tuple of three elements:
- candidate thresholds
- mean expected utility
- upper and lower quantile of estimate of expected utility associated
with each threshold
"""
credibility /= 2
scores, utilities = sample_utilities(utility, data, N=N)
low = int(N * credibility)
high = int(N * (1 - credibility))
utilities = numpy.asarray(utilities)
utilities.sort(axis=1)
return scores, utilities.mean(1), numpy.asarray(utilities[:, [low, high]]).T
[docs]@public.add
def optimal_threshold(utility, data, N=4096):
scores, utilities = sample_utilities(utility, data, N=N)
means = utilities.mean(1)
idx = means.argmax()
return scores[idx], means[idx]
[docs]@public.add
def sample_utilities(utility, data, N=4096):
"""Get distribution of utilities.
Args:
utility (float): utility function that ingests true/false
positive/negative rates.
data (list-like): iterable of of iterables of the form (ground truth, score).
Feedback is null when an alert is not triggered.
returns: thresholds, utilities
"""
if not len(data):
return data, numpy.asarray([])
nprng = numpy.random.RandomState(0)
data = numpy.asarray(data)
num_positives = data[:, 0].sum()
rates = [1 + num_positives, 1 + len(data) - num_positives, 1, 1]
utilities = []
data = data[numpy.argsort(data[:, 1])]
for ground_truth, score in data:
update_rates(rates, ground_truth)
utilities.append(utility(*nprng.dirichlet(rates, size=N).T))
return data[:, 1], numpy.asarray(utilities)
[docs]@public.add
def thompson_sample(utility, data, N=1024, quantile=False):
scores, utilities = sample_utilities(utility, data, N)
if quantile:
return utilities.argmax(axis=0) / (len(utilities) - 1)
return scores[utilities.argmax(axis=0)]
[docs]@public.add
def update_rates(rates, ground_truth):
rates[0] -= ground_truth
rates[1] -= not ground_truth
rates[2] += not ground_truth
rates[3] += ground_truth
[docs]@public.add
class AdaptiveThreshold:
"""Adaptive agent that balances exploration with exploitation with respect
to setting and adjusting thresholds.
When exploring, the threshold is 0, effectively letting anything
through. This produces unbiased data that can then be used to set a
more optimal threshold in subsequent rounds. The agent seeks to
balance the opportunity cost of running an experiment with the
utility gained over subsequent rounds using the information gained
from this experiment.
"""
[docs] def __init__(self, utility):
"""
Args:
utility (function): Function that takes in true/false
positive/negative rates. Specifically (tp, fp, tn fn) -> float
representing utility."""
self.utility = utility
self.results = []
self.unbiased_positives = 1
self.unbiased_negatives = 1
self.previous_threshold = 0
self.nprng = numpy.random.RandomState(0)
[docs] def get_best_threshold(self):
# true positives, false positives, true negatives, false negatives
rates = [self.unbiased_positives, self.unbiased_negatives, 1, 1]
experiment_utility = self.utility(*self.nprng.dirichlet(rates))
hypothetical_rates = [
self.unbiased_positives - self.last_experiment_outcome,
self.unbiased_negatives - (1 - self.last_experiment_outcome),
1,
1,
]
best_hypothetical_utility = -numpy.inf
best_utility = -numpy.inf
for score, ground_truth, idx in self.results:
update_rates(rates, ground_truth)
utility = self.utility(*self.nprng.dirichlet(rates))
if utility > best_utility:
best_utility = utility
best_threshold = score
if idx >= self.last_experiment_idx:
continue
update_rates(hypothetical_rates, ground_truth)
hypothetical_utility = self.utility(
*self.nprng.dirichlet(hypothetical_rates)
)
if hypothetical_utility > best_hypothetical_utility:
best_hypothetical_utility = hypothetical_utility
hindsight_utility = utility
return best_threshold, experiment_utility, best_utility, hindsight_utility
[docs] def __call__(self, ground_truth, score):
"""Args are ignored if previous threshold was not 0. Otherwise, the
score is added as a potential threhold and ground_truth noted to help
identify the optimal threshold.
Args:
ground_truth (bool)
score (float)
"""
idx = len(self.results)
if self.previous_threshold == 0:
bisect.insort(self.results, (score, ground_truth, idx))
self.unbiased_positives += ground_truth
self.unbiased_negatives += 1 - ground_truth
self.last_experiment_idx = idx
self.last_experiment_outcome = ground_truth
if len(self.results) < 2:
return self.previous_threshold
(
best_threshold,
experiment_utility,
best_utility,
hindsight_utility,
) = self.get_best_threshold()
total_utility_gained = (best_utility - hindsight_utility) * (
idx - self.last_experiment_idx
)
opportunity_cost = hindsight_utility - experiment_utility
if opportunity_cost <= total_utility_gained:
self.previous_threshold = 0
else:
self.previous_threshold = best_threshold
return self.previous_threshold
[docs]@public.add
def exploration_proportion(thresholds, N):
exploration = thresholds == 0
alpha = 1 - 1.0 / N
return reduce(
lambda accum, elem: accum + [accum[-1] * alpha + elem * (1 - alpha)],
exploration[N:],
[exploration[:N].mean()],
)