一聚教程网:一个值得你收藏的教程网站

热门教程

php与python 线程池多线程爬虫的例子

时间:2022-06-24 17:45:40 编辑:袖梨 来源:一聚教程网

php例子

 
class Connect extends Worker  //worker模式
{
 
public function __construct()
{
 
}
 
public function getConnection()
{
if (!self::$ch)
{
self::$ch = curl_init();
curl_setopt(self::$ch, CURLOPT_TIMEOUT, 2);
curl_setopt(self::$ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt(self::$ch, CURLOPT_HEADER, 0);
curl_setopt(self::$ch, CURLOPT_NOSIGNAL, true);
curl_setopt(self::$ch, CURLOPT_USERAGENT, "Firefox");
curl_setopt(self::$ch, CURLOPT_FOLLOWLOCATION, 1);
}
 
/* do some exception/error stuff here maybe */
 
return self::$ch;
}
 
public function closeConnection()
{
curl_close(self::$ch);
}
 
/**
* Note that the link is stored statically, which for pthreads, means thread local
* */
protected static $ch;
 
}
 
class Query extends Threaded
{
 
public function __construct($url)
{
$this->url = $url;
}
 
public function run()
{
$ch = $this->worker->getConnection();
curl_setopt($ch, CURLOPT_URL, $this->url);
$page = curl_exec($ch);
$info = curl_getinfo($ch);
$error = curl_error($ch);
$this->deal_data($this->url, $page, $info, $error);
 
$this->result = $page;
}
 
function deal_data($url, $page, $info, $error)
{
$parts = explode(".", $url);
 
$id = $parts[1];
if ($info['http_code'] != 200)
{
$this->show_msg($id, $error);
} else
{
$this->show_msg($id, "OK");
}
}
 
function show_msg($id, $msg)
{
echo $id."t$msgn";
}
 
public function getResult()
{
return $this->result;
}
 
protected $url;
protected $result;
 
}
 
function check_urls_multi_pthreads()
{
global $check_urls;  //定义抓取的连接
$check_urls = array( 'http://xxx.com' => "xx网",);
$pool = new Pool(10, "Connect", array()); //建立10个线程池
foreach ($check_urls as $url => $name)
{
$pool->submit(new Query($url));
}
$pool->shutdown();
}
 
check_urls_multi_pthreads();


python 多线程


def handle(sid)://这个方法内执行爬虫数据处理
pass
class MyThread(Thread):
"""docstring for ClassName"""
def __init__(self, sid):
Thread.__init__(self)
self.sid = sid
 
def run():
handle(self.sid)
 
threads = []
for i in xrange(1,11):
t = MyThread(i)
threads.append(t)
t.start()
 
for t in threads:
t.join()


python 线程池爬虫


from queue import Queue
from threading import Thread, Lock
import urllib.parse
import socket
import re
import time
 
seen_urls = set(['/'])
lock = Lock()
 
 
class Fetcher(Thread):
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
 
        self.start()
 
    def run(self):
        while True:
            url = self.tasks.get()
            print(url)
            sock = socket.socket()
            sock.connect(('localhost', 3000))
            get = 'GET {} HTTP/1.0rnHost: localhostrnrn'.format(url)
            sock.send(get.encode('ascii'))
            response = b''
            chunk = sock.recv(4096)
            while chunk:
                response += chunk
                chunk = sock.recv(4096)
 
            links = self.parse_links(url, response)
 
            lock.acquire()
            for link in links.difference(seen_urls):
                self.tasks.put(link)
            seen_urls.update(links)   
            lock.release()
 
            self.tasks.task_done()
 
    def parse_links(self, fetched_url, response):
        if not response:
            print('error: {}'.format(fetched_url))
            return set()
        if not self._is_html(response):
            return set()
        urls = set(re.findall(r'''(?i)href=["']?([^s"'<>]+)''',
                              self.body(response)))
 
        links = set()
        for url in urls:
            normalized = urllib.parse.urljoin(fetched_url, url)
            parts = urllib.parse.urlparse(normalized)
            if parts.scheme not in ('', 'http', 'https'):
                continue
            host, port = urllib.parse.splitport(parts.netloc)
            if host and host.lower() not in ('localhost'):
                continue
            defragmented, frag = urllib.parse.urldefrag(parts.path)
            links.add(defragmented)
 
        return links
 
    def body(self, response):
        body = response.split(b'rnrn', 1)[1]
        return body.decode('utf-8')
 
    def _is_html(self, response):
        head, body = response.split(b'rnrn', 1)
        headers = dict(h.split(': ') for h in head.decode().split('rn')[1:])
        return headers.get('Content-Type', '').startswith('text/html')
 
 
class ThreadPool:
    def __init__(self, num_threads):
        self.tasks = Queue()
        for _ in range(num_threads):
            Fetcher(self.tasks)
 
    def add_task(self, url):
        self.tasks.put(url)
 
    def wait_completion(self):
        self.tasks.join()
 
if __name__ == '__main__':
    start = time.time()
    pool = ThreadPool(4)
    pool.add_task("/")
    pool.wait_completion()
    print('{} URLs fetched in {:.1f} seconds'.format(len(seen_urls),time.time() - start))

热门栏目