我是AnyEvent编程的新手,但之前用回调做了大量的perl和javascript / jquery异步调用.这对我来说很有意义,但它并没有用AnyEvent Mech点击我.
这是我正在处理的代码,它从上游队列中提取URL.给出URL,我想得到一个说明拉入页面上的所有图像,然后异步.抓住所有图像.
所以伪代码看起来像这样:
>从队列中抓取网址
>获取页面
>获取所有img url链接
>在img urls上执行许多异步调用(例如在后端存储imgs)
我读过,我不能(在研究错误之后)在AnyEvent回调中阻塞.如何构建我的程序以进行异步调用而不阻塞?
AE事件只能在AE感知功能阻止时处理,因此我使用的是LWP::Protocol::AnyEvent::http.它使用AnyEvent :: http替换LWP(Net:http)的正常http后端,这是AE感知的.
工作人员创建如下:
my Worker->new(upstream_job_url => "tcp://127.0.0.1:5555',run_on_create => 1);
异步部分是sub _recv_msg,它调用_proc_msg.
我已经有一个AnyEvent循环,根据ZeroMQ perl绑定文档观察ZeroMQ套接字…
任何帮助非常感谢!
码:
package Worker;use 5.12.0;use Moose;use AnyEvent;use LWP::Protocol::AnyEvent::http;use ZMQ::libZMQ3;use ZMQ::Constants qw/ZMQ_PUSH ZMQ_PulL ZMQ_PolliN ZMQ_FD/;use JsON;use WWW::Mechanize;use Carp;use Coro;has 'max_children' => ( is => 'rw',isa => 'Int',required => 1,default => sub { 0 });has 'upstream_job_url' => ( is => 'rw',isa => 'URI',);has ['uri','sink_url'] => ( is => 'rw',required => 0,);has 'run_on_create' => ( is => 'rw',isa => 'Bool',default => sub { 1 });has '_receiver' => ( is => 'rw',isa => 'ZMQ::libZMQ3::Socket',required => 0);sub BUILD { my $self = shift; $self->start if $self->run_on_create;}sub start{ my $self = shift; $self->_init_zmq(); my $fh = zmq_getsockopt( $self->_receiver,ZMQ_FD ); my $w; $w = AnyEvent->io( fh => $fh,poll => "r",cb => sub { $self->_recv_msg } ); AnyEvent->condvar->recv;}sub _init_zmq{ my $self = shift; my $c = zmq_init() or dIE "zmq_init: $!\n"; my $recv = zmq_socket($c,ZMQ_PulL) or dIE "zmq_socket: $!\n"; if( zmq_connect($recv,$self->upstream_job_url) != 0 ) { croak "zmq_connect: $!\n"; } $self->_receiver($recv);}sub _recv_msg{ my $self = shift; while(my $message = zmq_msg_data(zmq_recvmsg($self->_receiver)) ) { my $msg = JsON::from_Json($message,{utf8 => 1}); $self->uri(URI->new($msg->{url})); $self->_proc_msg; }}sub _proc_msg{ my $self = shift; my $c = async { my $ua = WWW::Mechanize->new; $ua->protocols_allowed(['http']); print "$$processing " . $self->uri->as_string . "... "; $ua->get($self->uri->as_string); if ($ua->success()) { say $ua->status . " OK"; } else { say $ua->status . " NOT OK"; } }; $c->join;}1;
正如你所看到的,我在_proc_msg中尝试Coro,我尝试过只做mech调用但是出错了
AnyEvent::CondVar: recursive blocking wait attempted at lib/Worker.pm line 91.
因为$mech在回调中仍然受阻.我不确定如何在我的回调中正确进行机械调用.
在ikegami的要求下,我添加了发送网址的驱动程序.出于测试目的,我只需阅读RSS源,并将链接发送给工作人员以尝试处理.我很好奇有关回调的任何事件的基本结构,但我很高兴只是为了获得该程序的帮助.这是驱动程序代码:
#!/usr/local/bin/perluse strict;use warnings;use v5.12.0;use lib './lib';use Config::General;use Getopt::Long;use Carp;use AnyEvent;use AnyEvent::Feed;use Parallel::ForkManager;use ZMQ::libZMQ3;use ZMQ::Constants qw(ZMQ_PUSH ZMQ_PulL);use Worker;# DeBUGuse Data::Dumper;$Data::Dumper::Deparse = 1;my $config_file = "Feeds.cfg";Getoptions( "--config|c" => $config_file,"--help|h" => sub { usage(); exit(0); });sub usage() { say "Todo";}$SIG{INT} = sub { croak; }; $SIG{TERM} = sub { croak; };$SIG{CHLD} = 'IGnorE';my $conf = Config::General->new($config_file) or croak "Couldn't open config file '$config_file' $!\n";my %config = $conf->getall();my @readers = ();my @Feeds = load_Feeds(\%config);my $mgr = Parallel::ForkManager->new( $config{'max_download_children'} ) or croak "Can't create fork manager: $!\n";my $context = zmq_init() or croak "zmq_init: $!\n";my $sender = zmq_socket($context,ZMQ_PUSH) or dIE "zmq_socket: $!\n";foreach my $Feed_cfg (@Feeds) { my $reader = AnyEvent::Feed->new(url => delete $Feed_cfg->{url},%$Feed_cfg); push(@readers,$reader); # save,don't go out of scope}# Fork Downloader children. These processes will look for incoming data# in the img_queue and download the images,storing them in nosqlfor ( 1 .. $config{'max_download_children'} ) { my $pID = $mgr->start; if (!$pID) { # Child my $worker = Worker->new({ upstream_job_url => URI->new('tcp://127.0.0.1:5555') }); $mgr->finish; say "$$exiting."; exit(0); } else { # Parent say "[forked child $pID] my pID is $$"; }}if (zmq_bind($sender,'tcp://127.0.0.1:5555') < 0) { croak "zmq_bind: $!\n";}# Event loop AnyEvent->condvar->recv;sub load_Feeds{ my $conf = shift; my @Feeds = (); foreach my $Feed ( keys %{$conf->{'Feeds'}} ) { my $Feed_ref = $conf->{'Feeds'}; $Feed_ref->{$Feed}->{'name'} = $Feed; $Feed_ref->{$Feed}->{'on_fetch'} = \&fetch_Feed_cb; push(@Feeds,$Feed_ref->{$Feed}); } return @Feeds;}sub fetch_Feed_cb{ my ($Feed_reader,$new_entrIEs,$Feed,$error) = @_; if (defined $error) { say "Error fetching Feed: $error"; return; } say "$$checking for new Feeds"; for (@$new_entrIEs) { my ($hash,$entry) = @$_; say "$$sending " . $entry->link; zmq_send($sender,JsON::to_Json( { url => $entry->link },{ pretty => 1,utf8 => 1 } )); }}
这是一个示例运行:
[forked child 40790] my pID is 40789[forked child 40791] my pID is 40789[forked child 40792] my pID is 4078940789 checking for new Feeds40789 sending http://Feedproxy.Google.com/~r/PerlNews/~3/f5nNM3zYBt0/40789 sending http://Feedproxy.Google.com/~r/PerlNews/~3/Ay9V5pIpFBA/40789 sending http://Feedproxy.Google.com/~r/PerlNews/~3/5XCVvt75ppU/40789 sending http://Feedproxy.Google.com/~r/PerlNews/~3/mWprjBD3UhM/40789 sending http://Feedproxy.Google.com/~r/PerlNews/~3/NngMs9pCQew/40789 sending http://Feedproxy.Google.com/~r/PerlNews/~3/wiUsvafLGFU/40789 sending http://Feedproxy.Google.com/~r/PerlNews/~3/QMp6gnZpFcA/40789 sending http://Feedproxy.Google.com/~r/PerlNews/~3/kqUb_rpU5dE/40789 sending http://Feedproxy.Google.com/~r/PerlNews/~3/tHItKqKhGXg/40789 sending http://Feedproxy.Google.com/~r/PerlNews/~3/7LleQbVnPmE/FATAL: $Coro::IDLE blocked itself - dID you try to block insIDe an event loop callback? Caught at lib/Worker.pm line 99.FATAL: $Coro::IDLE blocked itself - dID you try to block insIDe an event loop callback? Caught at lib/Worker.pm line 99.FATAL: $Coro::IDLE blocked itself - dID you try to block insIDe an event loop callback? Caught at lib/Worker.pm line 99.40791 processing http://Feedproxy.Google.com/~r/PerlNews/~3/Ay9V5pIpFBA/... 40790 processing http://Feedproxy.Google.com/~r/PerlNews/~3/f5nNM3zYBt0/... 40792 processing http://Feedproxy.Google.com/~r/PerlNews/~3/5XCVvt75ppU/... ^C at /usr/local/perls/perl5162/lib/perl5/site_perl/darwin-thread-multi-2level/AnyEvent/Loop.pm line 231.
如果我没有明确做’使用Coro’;在Worker.pm中,coro FATAL错误不显示.我不知道async在没有进一步的运行时错误之前是如何工作的.
示例配置文件(Feeds.cfg):
max_download_children = 3<Feeds> <Feed1> url="http://Feeds.Feedburner.com/PerlNews?format=xml" interval=60 </Feed1></Feeds>
所以今天我花了一点时间.所以我做出$c->加入的方式的错误.我不应该这样做,因为我无法阻止回调. Coro将安排异步块,它将在完成后完成.我唯一需要确保做的就是以某种方式知道所有的asyncs何时完成,我想我能搞清楚.现在棘手的部分是试图找出这个小秘密:
sub _recv_msg{ my $self = shift; while(my $message = zmq_msg_data(zmq_recvmsg($self->_receiver)) ) { my $msg = JsON::from_Json($message,{utf8 => 1}); $self->uri(URI->new($msg->{url})); $self->_proc_msg; }}
这个while循环导致_proc_msg中的异步{}线程不运行.删除while循环,只需处理第一个msg和coros运行.保持while循环,它们永远不会运行.对我来说很奇怪,还没弄明白为什么.
进一步更新:
zmq_msg_recv被阻止了.此外,父级中的zmq_send可以阻止.必须使用ZMQ_NOBLOCK.
我将worker和main完全拆分为单独的程序.
use AnyEvent::http::LWP::UserAgent; use AnyEvent; my $ua = AnyEvent::http::LWP::UserAgent->new; my @urls = (...); my $cv = AE::cv; $cv->begin; foreach my $url (@urls) { $cv->begin; $ua->get_async($url)->cb(sub { my $r = shift->recv; print "url $url,content " . $r->content . "\n"; $cv->end; }); } $cv->end; $cv->recv;总结
以上是内存溢出为你收集整理的perl – 如何使用anyevent进行异步www-mechanize全部内容,希望文章能够帮你解决perl – 如何使用anyevent进行异步www-mechanize所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)