rfc@sTdgZddlZddlZddlZddlZddlZddlmZmZm Z ddl m Z m Z dZ dZdZejZdZd efd YZefed Zdefd YZd efdYZeZdefdYZdefdYZdefdYZdefdYZdS(tPooliN(tProcesst cpu_countt TimeoutError(tFinalizetdebugiiicCs t|S(N(tmap(targs((s,/usr/lib64/python2.7/multiprocessing/pool.pytmapstar@stMaybeEncodingErrorcBs)eZdZdZdZdZRS(sVWraps possible unpickleable errors, so they can be safely sent through the socket.cCsAt||_t||_tt|j|j|jdS(N(treprtexctvaluetsuperR t__init__(tselfR R ((s,/usr/lib64/python2.7/multiprocessing/pool.pyRKscCsd|j|jfS(Ns(Error sending result: '%s'. Reason: '%s'(R R (R((s,/usr/lib64/python2.7/multiprocessing/pool.pyt__str__Ps cCsdt|S(Ns(tstr(R((s,/usr/lib64/python2.7/multiprocessing/pool.pyt__repr__Ts(t__name__t __module__t__doc__RRR(((s,/usr/lib64/python2.7/multiprocessing/pool.pyR Gs  c Cs|dks0t|tkr*|dks0t|j}|j}t|drn|jj|j jn|dk r||nd}x/|dks|r||kry |}Wn"t t fk rt dPnX|dkrt dPn|\} } } } } yt | | | f}Wntk rI}t|f}nXy|| | |fWnMtk r}t||d}t d||| | t|ffnX|d7}qWt d|dS(Nit_writers)worker got EOFError or IOError -- exitingsworker got sentinel -- exitingis0Possible encoding error while sending result: %ssworker exiting after %d tasks(tNonettypetinttAssertionErrortputtgetthasattrRtcloset_readertEOFErrortIOErrorRtTruet ExceptiontFalseR (tinqueuetoutqueuet initializertinitargstmaxtasksRRt completedttasktjobtitfuncRtkwdstresulttetwrapped((s,/usr/lib64/python2.7/multiprocessing/pool.pytworkerXs@0     !    cBseZdZeZdddddZdZdZdZdZ didZ ddZ dd Z dd Z didd Zddd Zed ZedZedZedZdZdZdZdZedZedZRS(sH Class which supports an async version of the `apply()` builtin c Cs)|jtj|_i|_t|_||_||_||_|dkr|y t }Wq|t k rxd}q|Xn|dkrt dn|dk rt |d rtdn||_g|_|jtjdtjd|f|_t|j_t|j_|jjtjdtjd|j|j|j|jf|_t|j_t|j_|jjtjdtjd|j|j|jf|_ t|j _t|j _|j jt!||j"d|j|j#|j|j|j|j|j |jfdd|_$dS( Nis&Number of processes must be at least 1t__call__sinitializer must be a callablettargetRt exitpriorityi(%t _setup_queuestQueuet _taskqueuet_cachetRUNt_statet_maxtasksperchildt _initializert _initargsRRtNotImplementedErrort ValueErrorRt TypeErrort _processest_poolt_repopulate_poolt threadingtThreadRt_handle_workerst_worker_handlerR"tdaemontstartt _handle_taskst _quick_putt _outqueuet _task_handlert_handle_resultst _quick_gett_result_handlerRt_terminate_poolt_inqueuet _terminate(Rt processesR'R(tmaxtasksperchild((s,/usr/lib64/python2.7/multiprocessing/pool.pyRsV                     $          cCswt}xjttt|jD]M}|j|}|jdk r"td||jt }|j|=q"q"W|S(sCleanup after any worker processes which have exited due to reaching their specified lifetime. Returns True if any workers were cleaned up. scleaning up worker %dN( R$treversedtrangetlenRDtexitcodeRRtjoinR"(RtcleanedR-R3((s,/usr/lib64/python2.7/multiprocessing/pool.pyt_join_exited_workerss"  c Csxt|jt|jD]}|jdtd|j|j|j|j |j f}|jj ||j j dd|_ t|_|jtdqWdS(sBring the number of pool processes up to the specified number, for use after reaping workers which have exited. R5RRt PoolWorkers added workerN(RYRCRZRDRR3RTRNR>R?R=tappendtnametreplaceR"RJRKR(RR-tw((s,/usr/lib64/python2.7/multiprocessing/pool.pyREs#   cCs|jr|jndS(sEClean up any exited workers and start replacements for them. N(R^RE(R((s,/usr/lib64/python2.7/multiprocessing/pool.pyt_maintain_pools cCsPddlm}||_||_|jjj|_|jjj|_ dS(Ni(t SimpleQueue( tqueuesReRTRNRtsendRMRtrecvRQ(RRe((s,/usr/lib64/python2.7/multiprocessing/pool.pyR7s   cCs.|jtkst|j|||jS(s1 Equivalent of `apply()` builtin (R<R;Rt apply_asyncR(RR.RR/((s,/usr/lib64/python2.7/multiprocessing/pool.pytapplyscCs.|jtkst|j|||jS(s/ Equivalent of `map()` builtin (R<R;Rt map_asyncR(RR.titerablet chunksize((s,/usr/lib64/python2.7/multiprocessing/pool.pyRsics|jtkst|dkrft|j|jjfdt|DjfS|dksxtt j ||}t|j|jjfdt|DjfdDSdS(sZ Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()` ic3s0|]&\}}j||fifVqdS(N(t_job(t.0R-tx(R.R0(s,/usr/lib64/python2.7/multiprocessing/pool.pys sc3s0|]&\}}j|t|fifVqdS(N(RnR(RoR-Rp(R0(s,/usr/lib64/python2.7/multiprocessing/pool.pys scss"|]}|D] }|Vq qdS(N((Rotchunktitem((s,/usr/lib64/python2.7/multiprocessing/pool.pys sN( R<R;Rt IMapIteratorR:R9Rt enumeratet _set_lengthRt _get_tasks(RR.RlRmt task_batches((R.R0s,/usr/lib64/python2.7/multiprocessing/pool.pytimaps cs|jtkst|dkrft|j|jjfdt|DjfS|dksxtt j ||}t|j|jjfdt|DjfdDSdS(sK Like `imap()` method but ordering of results is arbitrary ic3s0|]&\}}j||fifVqdS(N(Rn(RoR-Rp(R.R0(s,/usr/lib64/python2.7/multiprocessing/pool.pys sc3s0|]&\}}j|t|fifVqdS(N(RnR(RoR-Rp(R0(s,/usr/lib64/python2.7/multiprocessing/pool.pys scss"|]}|D] }|Vq qdS(N((RoRqRr((s,/usr/lib64/python2.7/multiprocessing/pool.pys sN( R<R;RtIMapUnorderedIteratorR:R9RRtRuRRv(RR.RlRmRw((R.R0s,/usr/lib64/python2.7/multiprocessing/pool.pytimap_unordereds cCsV|jtkstt|j|}|jj|jd|||fgdf|S(s> Asynchronous equivalent of `apply()` builtin N( R<R;Rt ApplyResultR:R9RRnR(RR.RR/tcallbackR0((s,/usr/lib64/python2.7/multiprocessing/pool.pyRi s+cs|jtkstt|ds3t|}n|dkr}tt|t|jd\}}|r}|d7}q}nt|dkrd}nt j |||}t |j |t|||j jfdt|DdfS(s< Asynchronous equivalent of `map()` builtin t__len__iiic3s0|]&\}}j|t|fifVqdS(N(RnR(RoR-Rp(R0(s,/usr/lib64/python2.7/multiprocessing/pool.pys :sN(R<R;RRtlistRtdivmodRZRDRRvt MapResultR:R9RRt(RR.RlRmR|textraRw((R0s,/usr/lib64/python2.7/multiprocessing/pool.pyRk)s ( cCsotj}xB|jtks6|jrP|jtkrP|jtjdqW|j j dt ddS(Ng?sworker handler exiting( RFtcurrent_threadR<R;R:t TERMINATERdttimetsleepR9RRR(tpooltthread((s,/usr/lib64/python2.7/multiprocessing/pool.pyRH>s  * c Cs1tj}xt|jdD]\}}d}xt|D]P\}}|jratdPny||Wq>tk rtdPq>Xq>W|rtd||dqqPqWtdy@td|j dtdx|D]} |dqWWntk r"td nXtd dS( Nis'task handler found thread._state != RUNscould not put task on queuesdoing set_length()istask handler got sentinels/task handler sending sentinel to result handlers(task handler sending sentinel to workerss/task handler got IOError when sending sentinelsstask handler exiting( RFRtiterRRRtR<RR!R( t taskqueueRR&RRttaskseqt set_lengthR-R+tp((s,/usr/lib64/python2.7/multiprocessing/pool.pyRLKs6             cCstj}xy |}Wn"ttfk r@tddSX|jrm|jtks_ttdPn|dkrtdPn|\}}}y||j ||Wqt k rqXqx|ri|jtkriy |}Wn"ttfk rtddSX|dkr+tdqn|\}}}y||j ||Wqt k reqXqWt |drtdy5x.t dD] }|j jsPn|qWWqttfk rqXntdt||jdS( Ns.result handler got EOFError/IOError -- exitings,result handler found thread._state=TERMINATEsresult handler got sentinels&result handler ignoring extra sentinelRs"ensuring that outqueue is not fulli s7result handler exiting: len(cache)=%s, thread._state=%s(RFRR!R RR<RRRt_settKeyErrorRRYRtpollRZ(R&RtcacheRR+R,R-tobj((s,/usr/lib64/python2.7/multiprocessing/pool.pyRPrsX              ccsCt|}x0ttj||}|s1dS||fVqdS(N(Rttuplet itertoolstislice(R.tittsizeRp((s,/usr/lib64/python2.7/multiprocessing/pool.pyRvs  cCstddS(Ns:pool objects cannot be passed between processes or pickled(R@(R((s,/usr/lib64/python2.7/multiprocessing/pool.pyt __reduce__scCs5td|jtkr1t|_t|j_ndS(Ns closing pool(RR<R;tCLOSERI(R((s,/usr/lib64/python2.7/multiprocessing/pool.pyRs  cCs-tdt|_t|j_|jdS(Nsterminating pool(RRR<RIRU(R((s,/usr/lib64/python2.7/multiprocessing/pool.pyt terminates   cCsntd|jttfks%t|jj|jj|jjx|j D]}|jqVWdS(Ns joining pool( RR<RRRRIR\RORRRD(RR((s,/usr/lib64/python2.7/multiprocessing/pool.pyR\s    cCsWtd|jjx9|jrR|jjrR|jjtjdqWdS(Ns7removing tasks from inqueue until task handler finishedi( Rt_rlocktacquiretis_aliveRRRhRR(R%t task_handlerR((s,/usr/lib64/python2.7/multiprocessing/pool.pyt_help_stuff_finishs    c Cstdt|_t|_td|j||t||jsct|dksctt|_|jdtdt j |k r|j dn|rt |ddrtdx-|D]"} | j dkr| jqqWntdt j |k r$|j dntd t j |k rP|j dn|rt |ddrtd x;|D]0} | jrztd | j| j qzqzWndS( Nsfinalizing pools&helping task handler/workers to finishisjoining worker handlerg}Ô%ITRsterminating workerssjoining task handlersjoining result handlersjoining pool workersscleaning up worker %d(RRR<RRZRRRRRFRR\RR[Rtpid( tclsRR%R&Rtworker_handlerRtresult_handlerRR((s,/usr/lib64/python2.7/multiprocessing/pool.pyRSs8    $          N((((RRRRRRR^RERdR7RjRRxRzRiRkt staticmethodRHRLRPRvRRRR\Rt classmethodRS(((s,/usr/lib64/python2.7/multiprocessing/pool.pyRs0  9         ':     R{cBsDeZdZdZdZddZddZdZRS(cCsStjtj|_tj|_||_t|_ ||_ |||js RscBs>eZdZdZddZeZdZdZRS(cCsktjtj|_tj|_||_tj |_ d|_ d|_ i|_|||j#s,        *.-I