多线程Java爬虫——示例代码

1)建立SQL Server数据库SE

use SE

CREATE TABLE [WebPageInfo] (
[DOCID] [int] NOT NULL ,
[URL] [varchar] (900)  NULL ,
[PAGETEXT] [text]  NULL ,
[LENGTH] [int] NULL ,
[TITLE] [varchar] (200)  NULL ,
[DESCRIPTION] [varchar] (200)  NULL ,
[KEYWORDS] [varchar] (200) NULL ,
[PR] [float] NULL CONSTRAINT [DF_WebPageInfo_PR] DEFAULT (0),
CONSTRAINT [PK_WebPageInfo] PRIMARY KEY  CLUSTERED 
(
  [DOCID]
)  ON [PRIMARY] 
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]
GO


CREATE TABLE [AnchorUrl] (
[ANCHORID] [int] IDENTITY (1, 1) NOT NULL ,
[DOCID] [int] NULL ,
[URL] [varchar] (900)  NULL ,
[ANCHORDOCID] [int] NULL ,
[ANCHORURL] [varchar] (900) NULL ,
[ANCHORTEXT] [varchar] (200) NULL ,
CONSTRAINT [PK_AnchorUrl] PRIMARY KEY  CLUSTERED 
(
  [ANCHORID]
)  ON [PRIMARY] 
) ON [PRIMARY]
GO

2)Java代码:

import java.io.*;
import java.net.*;
import java.sql.*;
import java.util.*;


public class Spider
{
	//所有正在等待处理和已经处理完毕的URL队列MD5值,只增不减,用于标记所有访问过的网页
	Hashtable completePages = new Hashtable();
	
	//所有等待处理的URL队列,既增也减,用于标记需要处理的网页
	Vector waitingPages = new Vector();
	
	//连接数据库
	Connection con;
	
	//网页索引号
	Integer docIDCount;				
	
	//线程个数
	int threadCount=0;

	//用于同步的对象标志变量
	Object lock=new Object();

	public Spider(String startURL, int threadCount)
	{
		
		//初始化内存存储向量
		synchronized(lock)
		{
			waitingPages.addElement(startURL);			
			String beginUrlMD5=MD5(startURL);	
			completePages.put(beginUrlMD5,beginUrlMD5);			
		}	
		
		try
		{
			Class.forName("com.microsoft.jdbc.sqlserver.SQLServerDriver");
			con = DriverManager.getConnection("jdbc:microsoft:sqlserver://localhost:1433;DatabaseName=SE","sa","");
		}
		catch(Exception ex)
		{
			System.out.println(ex.getMessage());
		}
		
		//设置线程开启数量
		this.threadCount=threadCount;	
		
		//启动核心爬虫函数
		process();				
	}

	//核心爬虫函数
	public void process()
	{
		//初始化DocID,它为现有DocID的最大值,如果没有任何记录,默认为1
		try
		{
			Statement stm=con.createStatement();    		
	      	ResultSet res=stm.executeQuery("select max(docid) from WebPageInfo");
	      	res.next();
	      	int resdocid=res.getInt(1);
	      	if(resdocid==0)
	      		docIDCount=new Integer(1);
	      	else
	      		docIDCount=new Integer(resdocid+1);    	
	    }
	    catch(Exception ex)
      	{
      		System.out.println(ex.getMessage());
      	}
		
		//开始启动线程
		System.out.println("Begin...");
		for(int i =0;i<threadCount;i++) { //线程号 final int num=i; //新建线程匿名类 Runnable spiderthread=new Runnable() { //URL前缀长度 final int hrefLen="href=\"".length(); //更新网页表 PreparedStatement pstmWebPageInfo; //更新锚信息表 PreparedStatement pstmAnchorUrl; //临时标准网页的内存字节数组,长度自动增长 ByteArrayOutputStream baos =new ByteArrayOutputStream(); public void run() { System.out.println("Thread "+num+" begin..."); try { pstmWebPageInfo=con.prepareStatement("insert into WebPageInfo (docID,URL,PAGETEXT) values(?,?,?)"); pstmAnchorUrl=con.prepareStatement("insert into AnchorUrl(DOCID,URL,ANCHORURL) values(?,?,?)"); } catch(Exception ex) { System.out.println(ex.getMessage()); } long startTime=Calendar.getInstance().getTimeInMillis(); //循环处理主体 while(true) { //退出判断 long endTime=Calendar.getInstance().getTimeInMillis(); if(endTime-startTime>60000)
						{
							System.out.println(num+"*****End*******");
							break;
						}
						
						String url=null;
						try
						{
							synchronized(lock)
							{
								url =(String)waitingPages.firstElement();
								waitingPages.remove(url);
							}
						}
						catch(Exception ex)
						{
							//System.out.println(ex.getMessage());
							continue;
						}							
						
						//开始处理
						startTime=Calendar.getInstance().getTimeInMillis();

						//如果有URL
						if(url!=null)
						{
							//输出正在处理的线程及其状态
							System.out.println("Thread "+num+" Working at "+url);
							
							//打开网络连接					
							URL startUrl;
							URLConnection urlConnection;	
							
							//获取网络输入流	
							InputStream is=null;
											
							try
							{												
								baos.reset();			
								startUrl= new URL(url);
								urlConnection = startUrl.openConnection();	
								is = urlConnection.getInputStream();
								int oneByte = is.read();
								int readflag =0;
								while(oneByte > 0) 
								{
									if( readflag==0 && oneByte==32)
									{
										oneByte = is.read();
										continue;
									}
									
									if( readflag==0 && oneByte!=60)
										break;
									else
										readflag=1;								
									
									baos.write(oneByte);
									oneByte = is.read();
								}
								if(readflag==1)
								{
									String webPageContent=baos.toString();
									
									//存储网页到网页表							
									int docID=writeToDatabase(url,webPageContent);
									
									//输出已经处理完毕的网页信息
									System.out.println("done!docid="+docID+" url="+url);
									
									//分析和存储网页链出信息
									analyzeAnchor(docID,url,webPageContent);	
								}
							}
							catch(Exception ex) 
							{								
								System.out.println(ex.getMessage());
							}
							try
							{						
								is.close();
							}
							catch(Exception ex) 
							{								
								System.out.println(ex.getMessage());
							}
							finally
							{
								System.gc();
							}							
						}
					}
				}
				
				//存储网页到网页表,返回网页索引标识号			
				public int writeToDatabase(String url, String content) 
				{
					//生成新的网页索引标识号
					int newDocID=0;
					synchronized(docIDCount)
					{
						newDocID=docIDCount.intValue();
						docIDCount=new Integer(newDocID+1);
					}
					
					//更新数据库表
					try
			      	{
		      			pstmWebPageInfo.setInt(1,newDocID);
		      			pstmWebPageInfo.setString(2,url);
		      			pstmWebPageInfo.setString(3,content);
		      			pstmWebPageInfo.executeUpdate();
			      	}
					catch (Exception ex) 
					{
						System.out.println(ex.getMessage());
					}		
		
					return newDocID;		
				}
			
				//分析网页链出信息
				public void analyzeAnchor(int docID, String url,String content)
				{
					//解析网页文本所需的地址偏移变量
					int beginPosition=0;
					int endPosition=0;
					
					//网页的动态数组
					ArrayList otherAnchor=new ArrayList();	
					
					//分析网页链出信息
					try
					{
						while(true)
						{				
							beginPosition=content.indexOf("href=\"",endPosition);;
							endPosition=content.indexOf("\"",beginPosition+hrefLen);
							if(beginPosition==-1 || endPosition==-1)
								break;
								
							//得到链出的URL
							String oneUrl=content.substring(beginPosition+hrefLen,endPosition);
							
							String finalStr="";
							
							//有效的URL
							if(oneUrl.trim().startsWith("http",0))
							{
								finalStr=oneUrl;
							}
							//确保不含有诸如mailto等其他类型的访问方式
							else if(oneUrl.indexOf("://")==-1)	
							{
								//处理直接根目录开头的URL
								if(oneUrl.trim().startsWith("/",0))
								{
									//hrefLen+1为7,是http://的长度
									int tempEndPos=url.indexOf("/",hrefLen+1);								
									finalStr=url.substring(0,tempEndPos)+oneUrl;
								}
								//处理其他类型的URL,如直接写文件名和子目录等,也包含..这种情况
								else
								{
									finalStr=url.substring(0,url.lastIndexOf("/"))+"/"+oneUrl;
								}
							}
							
							//消除/..的影响
							while(true)
							{
								int pos=finalStr.indexOf("/../");
								if(pos!=-1)
							{
								int prepos=finalStr.lastIndexOf("/",pos-1);
								finalStr=finalStr.substring(0,prepos)+finalStr.substring(pos+3);      	
							}
							else
								break;
							}

							otherAnchor.add(finalStr);
						}
					}
					catch(Exception ex)
					{
						System.out.println(ex.getMessage());
					}
					
					//存储网页链出信息
					storeAnchor(docID,url,otherAnchor);
				}
				
				//存储网页链出信息
				public void storeAnchor(int docID, String url,ArrayList anchors)
				{
					//更新数据库表
					for(int i=0;i<anchors.size();i++)
					{	
						String anchorURL=(String)anchors.get(i);
						try
				      	{
				      		pstmAnchorUrl.setInt(1,docID);
				      		pstmAnchorUrl.setString(2,url);
				      		pstmAnchorUrl.setString(3,anchorURL);
				      		pstmAnchorUrl.executeUpdate();
				      	}
						catch (Exception ex) 
						{
							System.out.println(ex.getMessage());
						}
						
						//在内存存储向量中添加需要处理的网页和已经处理的网页
						try
						{
							String urlMd5=MD5(anchorURL);
							
							synchronized(lock)
							{
								if(!completePages.contains(urlMd5))
								{
									//if( anchorURL.indexOf("njue.edu.cn")!=-1 || anchorURL.indexOf("//210")!=-1 ||anchorURL.indexOf("//211")!=-1)
									//if(anchorURL.indexOf("localhost")!=-1 )
									{	
										waitingPages.addElement(anchorURL);
										completePages.put(urlMd5,urlMd5);
									}			
								}
							}		
						}
						catch(Exception ex)
						{
							System.out.println(ex.getMessage());
						}
					}		
				}				
			};
			
			//启动线程匿名类
			new Thread(spiderthread).start();
		}
	}
	
	//计算MD5值的函数
	public String MD5(String s)
	{
		char hexDigits[] ={ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };
		try
		{
			byte[] strTemp = s.getBytes();
			java.security.MessageDigest mdTemp = java.security.MessageDigest.getInstance("MD5");
			mdTemp.update(strTemp);
			byte[] md = mdTemp.digest();
			int j = md.length;
			char str[] = new char[j * 2];
			int k = 0;
			for (int i = 0; i < j; i++) { byte byte0 = md[i]; str[k++] = hexDigits[byte0 >>> 4 & 0xf];
				str[k++] = hexDigits[byte0 & 0xf];
			}
			return new String(str);
		}
		catch (Exception e)
		{
			return null;
		}
	}

	public static void main(String args[]) 
	{
		Spider sp=new Spider("http://www.wdyd.com.cn/blog/",10);	
		//Spider sp=new Spider("http://localhost/a.html",10);	
	}
}

项目所需的SQL Server2000连接驱动Jar包下载:2kjdbc

发表评论

邮箱地址不会被公开。 必填项已用*标注