package Thread::Queue;
use strict;
use warnings;
our $VERSION = '2.11';
use threads::shared 1.21;
use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
# Carp errors from threads::shared calls should complain about caller
our @CARP_NOT = ("threads::shared");
# Predeclarations for internal functions
my ($validate_count, $validate_index);
# Create a new queue possibly pre-populated with items
sub new
{
my $class = shift;
my @queue :shared = map { shared_clone($_) } @_;
return bless(\@queue, $class);
}
# Add items to the tail of a queue
sub enqueue
{
my $queue = shift;
lock(@$queue);
push(@$queue, map { shared_clone($_) } @_)
and cond_signal(@$queue);
}
# Return a count of the number of items on a queue
sub pending
{
my $queue = shift;
lock(@$queue);
return scalar(@$queue);
}
# Return 1 or more items from the head of a queue, blocking if needed
sub dequeue
{
my $queue = shift;
lock(@$queue);
my $count = @_ ? $validate_count->(shift) : 1;
# Wait for requisite number of items
cond_wait(@$queue) until (@$queue >= $count);
cond_signal(@$queue) if (@$queue > $count);
# Return single item
return shift(@$queue) if ($count == 1);
# Return multiple items
my @items;
push(@items, shift(@$queue)) for (1..$count);
return @items;
}
# Return items from the head of a queue with no blocking
sub dequeue_nb
{
my $queue = shift;
lock(@$queue);
my $count = @_ ? $validate_count->(shift) : 1;
# Return single item
return shift(@$queue) if ($count == 1);
# Return multiple items
my @items;
for (1..$count) {
last if (! @$queue);
push(@items, shift(@$queue));
}
return @items;
}
# Return an item without removing it from a queue
sub peek
{
my $queue = shift;
lock(@$queue);
my $index = @_ ? $validate_index->(shift) : 0;
return $$queue[$index];
}
# Insert items anywhere into a queue
sub insert
{
my $queue = shift;
lock(@$queue);
my $index = $validate_index->(shift);
return if (! @_); # Nothing to insert
# Support negative indices
if ($index < 0) {
$index += @$queue;
if ($index < 0) {
$index = 0;
}
}
# Dequeue items from $index onward
my @tmp;
while (@$queue > $index) {
unshift(@tmp, pop(@$queue))
}
# Add new items to the queue
push(@$queue, map { shared_clone($_) } @_);
# Add previous items back onto the queue
push(@$queue, @tmp);
# Soup's up
cond_signal(@$queue);
}
# Remove items from anywhere in a queue
sub extract
{
my $queue = shift;
lock(@$queue);
my $index = @_ ? $validate_index->(shift) : 0;
my $count = @_ ? $validate_count->(shift) : 1;
# Support negative indices
if ($index < 0) {
$index += @$queue;
if ($index < 0) {
$count += $index;
return if ($count <= 0); # Beyond the head of the queue
return $queue->dequeue_nb($count); # Extract from the head
}
}
# Dequeue items from $index+$count onward
my @tmp;
while (@$queue > ($index+$count)) {
unshift(@tmp, pop(@$queue))
}
# Extract desired items
my @items;
unshift(@items, pop(@$queue)) while (@$queue > $index);
# Add back any removed items
push(@$queue, @tmp);
# Return single item
return $items[0] if ($count == 1);
# Return multiple items
return @items;
}
### Internal Functions ###
# Check value of the requested index
$validate_index = sub {
my $index = shift;
if (! defined($index) ||
! looks_like_number($index) ||
(int($index) != $index))
{
require Carp;
my ($method) = (caller(1))[3];
$method =~ s/Thread::Queue:://;
$index = 'undef' if (! defined($index));
Carp::croak("Invalid 'index' argument ($index) to '$method' method");
}
return $index;
};
# Check value of the requested count
$validate_count = sub {
my $count = shift;
if (! defined($count) ||
! looks_like_number($count) ||
(int($count) != $count) ||
($count < 1))
{
require Carp;
my ($method) = (caller(1))[3];
$method =~ s/Thread::Queue:://;
$count = 'undef' if (! defined($count));
Carp::croak("Invalid 'count' argument ($count) to '$method' method");
}
return $count;
};
1;
=head1 NAME
Thread::Queue - Thread-safe queues
=head1 VERSION
This document describes Thread::Queue version 2.11
=head1 SYNOPSIS
use strict;
use warnings;
use threads;
use Thread::Queue;
my $q = Thread::Queue->new(); # A new empty queue
# Worker thread
my $thr = threads->create(sub {
while (my $item = $q->dequeue()) {
# Do work on $item
}
})->detach();
# Send work to the thread
$q->enqueue($item1, ...);
# Count of items in the queue
my $left = $q->pending();
# Non-blocking dequeue
if (defined(my $item = $q->dequeue_nb())) {
# Work on $item
}
# Get the second item in the queue without dequeuing anything
my $item = $q->peek(1);
# Insert two items into the queue just behind the head
$q->insert(1, $item1, $item2);
# Extract the last two items on the queue
my ($item1, $item2) = $q->extract(-2, 2);
=head1 DESCRIPTION
This module provides thread-safe FIFO queues that can be accessed safely by
any number of threads.
Any data types supported by L