跨度内存计算公式式______至少需要运行内存多少

1. 1排序排序是数据处理中经常使用的一种重要运算, 如何进行排序, 特别是如何进行高效的排 序,是计算机应用中的重要课题。排序的对象一般是一组记录组成的文件,而记录则是由若 干数据项组成, 其中的一项可用来标志一个记录, 称为关键字项, 该数据项的值称为关键字。 所谓排序,就是要整理文件中的记录,使得它按关键字递增(或递减)的次序排列起来。若 给定的文件含有n个记录{R 1 ,R 2 ,…,R n } ,它们的关键字分别为{K 1 ,K 2 ,…,K n } , ,使得{K i1 ≥K i2 ≥…≥K in } (或{K i1 要把这n个记录重新排列成为{R i1 ,R i2 ,…,R in } ≤K i2 ≤…≤K in } ) 。 本章主要介绍了枚举排序、快速排序、PSRS 排序算法以及它们的 MPI 编程实现。1.1 枚举排序1.1.1 枚举排序及其串行算法枚举排序 (Enumeration Sort) 是一种最简单的排序算法, 通常也称为秩排序 (Rank Sort) 。 该算法的具体思想是(假设按关键字递增排序) ,对每一个待排序的元素统计小于它的所有 元素的个数,从而得到该元素最终处于序列中的位置。假定待排序的n个数存在a[1]…a[n] 中。首先将a[1]与a[2]…a[n]比较,记录比其小的数的个数,令其为k,a[1]就被存入有序的数 组b[1]…b[n]的b[k+1]位置上;然后将a[2]与a[1],a[3]…a[n]比较,记录比其小的数的个数, 依此类推。这样的比较操作共n(n-1)次,所以串行秩排序的时间复杂度为O(n2) 。 算法 13.1 枚举排序串行算法 输入:a[1]…a[n] 输出:b[1]…b[n] Begin for i=1 to n do (1) k=1 (2) for j=1 to n do if a[i]&a[j] then k=k+1 end if end for (3) b[k]= a[i] end for End1.1.2 枚举排序的并行算法对该算法的并行化是很简单的, 假设对一个长为 n 的输入序列使用 n 个处理器进行排序, 只需是每个处理器负责完成对其中一个元素的定位,然后将所有的定位信息集中到主进程 中,由主进程负责完成所有元素的最终排位。该并行算法描述如下: 算法 13.2 枚举排序并行算法1 输入:无序数组 a[1]…a[n] 输出:有序数组 b[1]…b[n] Begin (1) P 0 播送a[1]…a[n]给所有P i (2) for all P i where 1≤i≤n para-do (2.1) k=1 (2.2) for j = 1 to n do if (a[i] & a[j]) or (a[i] = a[j] and i&j) then k = k+1 end if end for (3) P 0 收集k并按序定位 End 在该并行算法中,使用了n个处理器,由于每个处理器定位一个元素,所以步骤⑵的时 间复杂度为O(n) ;步 骤 ⑶中主进程完成的数组元素重定位操作的时间复杂度为O(n) ,通 2 信复杂度分别为O(n) ;同 时 ⑴中的通信复杂度为O(n ) ;所以总的计算复杂度为O(n) , 2 总的通信复杂度为O(n ) 。 MPI 源程序请参见所附光盘。1.2 快速排序1.2.1 快速排序及其串行算法快速排序(Quick Sort)是一种最基本的排序算法,它的基本思想是:在当前无序区 R[1,n]中取一个记录作为比较的“基准” (一般取第一个、最后一个或中间位置的元素) , 用此基准将当前的无序区 R[1,n]划分成左右两个无序的子区 R[1,i-1]和 R[i,n](1≤i≤ n), 且左边的无序子区中记录的所有关键字均小于等于基准的关键字, 右边的无序子区中记 录的所有关键字均大于等于基准的关键字;当 R[1,i-1]和 R[i,n]非空时,分别对它们重 复上述的划分过程,直到所有的无序子区中的记录均排好序为止。 算法 13.3 单处理机上快速排序算法 输入:无序数组 data[1,n] 输出:有序数组 data[1,n] Begin call procedure quicksort(data,1,n) End procedure quicksort(data,i,j) Begin (1) if (i&j) then (1.1)r = partition(data,i,j) (1.2)quicksort(data,i,r-1); (1.3)quicksort(data,r+1,j); end if End2 procedure partition(data,k,l) Begin (1) pivo=data[l] (2) i=k-1 (3) for j=k to l-1 do if data[j]≤pivo then i=i+1 exchange data[i] and data[j] end if end for (4) exchange data[i+1] and data[l] (5) return i+1 End 快速排序算法的性能主要决定于输入数组的划分是否均衡, 而这与基准元素的选择密切 相关。在最坏的情况下,划分的结果是一边有n-1 个元素,而另一边有 0 个元素(除去被选 中的基准元素) 。如果每次递归排序中的划分都产生这种极度的不平衡,那么整个算法的复 杂度将是Θ(n2) 。在最好的情况下,每次划分都使得输入数组平均分为两半,那么算法的 复杂度为O(nlogn) 。在一般的情况下该算法仍能保持O(nlogn)的复杂度,只不过其具有 更高的常数因子。1.2.2 快速排序的并行算法快速排序算法并行化的一个简单思想是, 对每次划分过后所得到的两个序列分别使用两 个处理器完成递归排序。例如对一个长为n的序列,首先划分得到两个长为n/2 的序列,将其 交给两个处理器分别处理;而后进一步划分得到四个长为n/4 的序列,再分别交给四个处理 器处理;如此递归下去最终得到排序好的序列。当然这里举的是理想的划分情况,如果划分 步骤不能达到平均分配的目的,那么排序的效率会相对较差。算法 13.4 中描述了使用 2m个 处理器完成对n个输入数据排序的并行算法。 算法 13.4 快速排序并行算法 输入:无序数组data[1,n],使用的处理器个数 2m 输出:有序数组 data[1,n] Begin para_quicksort(data,1,n,m,0) End procedure para_quicksort(data,i,j,m,id) Begin (1) if (j-i)≤k or m=0 then (1.1) P id call quicksort(data,i,j) else (1.2) P id: r=patrition(data,i,j) (1.3) P id send data[r+1,m-1] to P id+2 m-1 (1.4) para_quicksort(data,i,r-1,m-1,id) (1.5) para_quicksort(data,r+1,j,m-1,id+2m-1)3 (1.6) P id+2 m-1 send data[r+1,m-1] back to P id end if End 在最优的情况下该并行算法形成一个高度为logn的排序树,其计算复杂度为O(n) , 通 2 。正常情况 信复杂度也为O(n) 。同串行算法一样,在最坏情况下其计算复杂度降为O(n ) 下该算法的计算复杂度也为O(n) ,只不过具有更高的常数因子。 MPI 源程序请参见所附光盘。1.3 PSRS 排序1.3.1 PSRS 算法原理并行正则采样排序,简称 PSRS(Parallel Sorting by Regular Sampling)它是一种基 于均匀划分(Uniform Partition)原理的负载均衡的并行排序算法。假定待排序的元素有 n 个,系统中有 p 个处理器,那么系统首先将 n 个元素均匀地分割成 p 段,每段含有 n/p 个 元素,每段指派一个处理器,然后各个处理器同时施行局部排序。为了使各段中诸局部有序 的元素在整个序列中也能占据正确的位置, 那么就首先从各段中抽取几个代表元素, 再从他 们产生出 p-1 个主元, 然后按这些主元与原各局部有序中的元素之间的偏序关系, 将各个局 部有序段划分成 p 段, 接着通过全局交换将各个段中的对应部分集合在一起, 最后将这些集 合在一起的各部分采用多路归并的方法进行排序, 这些有序段汇合起来就自然成为全局有序 序列了。1.3.2 PSRS 算法形式化描述设有 n 个数据,P 个处理器,以及均匀分布在 P 个处理器上的 n 个数据。 则 PSRS 算法 可描述如下: 算法 13.5 PSRS 排序算法 输入:n 个待排序的数据,均匀地分布在 P 个处理器上 输出:分布在各个处理器上,得到全局有序的数据序列 Begin (1) 每个处理器将自己的 n/P 个数据用串行快速排序(Quicksort),得到一个排好序的 序列; (2) 每个处理器从排好序的序列中选取第 w,2w,3w,…,(P-1)w 个共 P-1 个数据作为 2 代表元素,其中w=n/P ; (3) 每个处理器将选好的代表元素送到处理器P 0 中,并将送来的P段有序的数据序列 做 P 路归并,再选择排序后的第 P-1,2(P-1),…,(P-1)(P-1)个共 P-1 个主元; (4) 处理器P 0 将这P-1 个主元播送到所有处理器中; (5) 每个处理器根据上步送来的 P-1 个主元把自己的 n/P 个数据分成 P 段,记 w ij 为处 理器P i 的第j+1 段,其中i=0,…,P-1,j=0,…,P-1; (6) 每个处理器送它的第i+1 段给处理器P i ,从而使得第i个处理器含有所有处理器的 第 i 段数据(i=0,…,P-1); (7) 每个处理器再通过 P 路归并排序将上一步的到的数据排序; 从而这 n 个数据便是有 序的。 End4 在该算法中,针对其中的计算部分⑴中的快速排序的代价为O(klogk) ,其 中 k=n/p;第 2 ⑵步中各个处理器选择P个主元的代价为O(P) ;⑶ 中 对P 个主元进行归并并选取新的主元所 2 需代价为O(P +logP) ;⑸中对数据划分的代价为O(P+n/p) ;最后第⑺步中各个处理器进行 2 2 并行归并的代价为O(n/p+logP) ; 。针对通信部分⑶中处理器P 0 收集P 个主元的代价为O(P ) (4)中播送新选择的P个主元的代价为O (P) ; 最 后 第(6)步的多对多播送的通信复杂度与具体 的算法实现相关,其最大不会超过O(n) 。 MPI 源程序请参见章末附录。1.4 小结本章主要讨论了几个典型的并行排序算法,关于枚举匹配算法的具体讨论可参考[1] ; 快速排序算法可以参考文献[2]和[3]中的介绍;文献[4]和[5]讨论了 PSRS 排序算 法。参考文献[1]. Chabbar E, Controle, gestion du parallelisme: tris synchrones et asynchrones. Thesis Universite de Franche-comte, France:1980 [2]. Lorin H. Sorting and sort systems. Don Mills, Ontario: Addison-Wesley, 5 [3]. 陈国良 编著. 并行算法的设计与分析(修订版). 高等教育出版社, ]. Shi H, Schaeffer J. Parallel Sorting by Regular Sampling. Journal of Parallel and Distributed Computing, 14(4), 1992 [5]. Li X, Lu P, Schaeffer J, Shillington J, Wong P S, Shi H. On the Versatility of Parallel Sorting by Regular Sampling. Parallel Computing, 19, 1993附录 PSRS 算法 MPI 源程序1. 源程序 psrs_sort.c#include &stdio.h& #include &stdlib.h& #include &mpi.h& #define INIT_TYPE 10 #define ALLTOONE_TYPE 100 #define ONETOALL_TYPE 200 #define MULTI_TYPE 300 #define RESULT_TYPE 400 #define RESULT_LEN 500 #define MULTI_LEN 600 int S main(int argc,char* argv[]) { long BaseNum = 1; int int PlusN MyID; long DataS int *arr,*arr1; int * int *temp1;MPI_Init(&argc,&argv);5 MPI_Comm_rank(MPI_COMM_WORLD, &MyID); PlusNum=60; DataSize = BaseNum*PlusN } if (MyID==0) printf(&The DataSize is : %lu\n&,PlusNum); Psrs_Main(); if (MyID==0) printf(&\n&); MPI_Finalize(); } Psrs_Main( ) { int i,j; int MyID,SumID; int n,c1,c2,c3,c4,k,l; FILE * MPI_Status status[32*32*2]; MPI_Request request[32*32*2]; MPI_Comm_rank(MPI_COMM_WORLD, &MyID); MPI_Comm_size(MPI_COMM_WORLD, &SumID); Spt = SumID-1; /*初始化参数*/ arr = (int *)malloc(2*DataSize*sizeof(int)); if (arr==0) merror(&malloc memory for arr error!&); arr1 = &arr[DataSize]; if (SumID&1) { temp1 = (int *)malloc(sizeof(int)* SumID*Spt); if (temp1==0) merror(&malloc memory for temp1 error!&); { }index = (int *)malloc(sizeof(int)* 2*SumID); if (index==0) merror(&malloc memory for index error!&);MPI_Barrier( MPI_COMM_WORLD); mylength = DataSize / SumID; srand(MyID); printf(&This is node %d \n&,MyID); printf(&On node %d the input data is:\n&,MyID); for (i=0;i&i++) { arr[i] = (int)rand(); printf(&%d : &,arr[i]); printf(&\n&); /*每个处理器将自己的 n/P 个数据用串行快速 排序(Quicksort),得到一个排好序的序 列,对应于算法 13.5 步骤(1)*/ MPI_Barrier( MPI_COMM_WORLD); quicksort(arr,0,mylength - 1); MPI_Barrier( MPI_COMM_WORLD); /*每个处理器从排好序的序列中选取第 w, 2w, 3w,…,(P-1)w 个共 P-1 个数据作为代 表元素,其中 w=n/P*P,对应于算法 13.5 步骤(2)*/ if (SumID&1) MPI_Barrier(MPI_COMM_WORLD); n = (int)(mylength/(Spt+1)); for (i=0;i&Si++) temp1[i] = arr[(i+1)*n-1]; MPI_Barrier(MPI_COMM_WORLD); if (MyID==0) { /*每个处理器将选好的代表元素送 到处理器 P0 中,对应于算法6 13.5 步骤(3) */ j = 0; for (i=1;i&SumID;i++) MPI_Irecv(&temp1[i*Spt], sizeof(int)*Spt, MPI_CHAR, i,ALLTOONE_TYPE+i, MPI_COMM_WORLD, &request[j++]); MPI_Waitall(SumID-1, request, status); /* 处理器 P0 将上一步送来的 P 段 有序的数据序列做 P 路归并, 再 选 择 排 序 后 的 第 P-1 , 2(P-1), …, (P-1)(P-1)个共 P-1 个主元, , 对应于算法 13.5 步 骤(3)*/ MPI_Barrier( MPI_COMM_WORLD); quicksort(temp1,0,SumID*Spt-1); MPI_Barrier( MPI_COMM_WORLD); for (i=1;i&Spt+1;i++) temp1[i] = temp1[i*Spt-1]; /*处理器 P0 将这 P-1 个主元播送到 所有处理器中,对应于算法 13.5 步骤(4)*/ MPI_Bcast(temp1, sizeof(int)*(1+Spt), MPI_CHAR, 0, MPI_COMM_WORLD); MPI_Barrier( MPI_COMM_WORLD); } else { MPI_Send(temp1,sizeof(int)*Spt, MPI_CHAR,0, ALLTOONE_TYPE+MyID, MPI_COMM_WORLD); MPI_Barrier( MPI_COMM_WORLD); MPI_Barrier( } } }MPI_COMM_WORLD); MPI_Bcast(temp1, sizeof(int)*(1+Spt), MPI_CHAR, 0, MPI_COMM_WORLD); MPI_Barrier( MPI_COMM_WORLD); /*每个处理器根据上步送来的 P-1 个主 元把自己的 n/P 个数据分成 P 段, 记为处理器 Pi 的第 j+1 段,其中 i=0, … ,P-1 , j=0, … ,P-1 ,对应于算 法 13.5 步骤(5)*/ n = index[0] = 0; i = 1; while ((arr[0]&=temp1[i])&&(i&SumID)) { index[2*i-1] = 0; index[2*i] = 0; i++; if (i==SumID) index[2*i-1] = c1 = 0; while (i&SumID) { c4 = temp1[i]; c3 = c2 = (int)((c1+c3)/2); while ((arr[c2]!=c4)&&(c1&c3)) { if (arr[c2]&c4) { c3 = c2-1; c2 = (int)((c1+c3)/2); } else { c1 = c2+1; c2 = (int)((c1+c3)/2); } while ((arr[c2]&=c4)&&(c2&n))7 c2++; if (c2==n) { index[2*i-1] = for (k=i;k&SumID;k++) { index[2*k] = 0; index[2*k+1] = 0; } i = SumID; } else { index[2*i] = c2; index[2*i-1] = c2; } c1 = c2; c2 = (int)((c1+c3)/2); i++; } if (i==SumID) index[2*i-1] = MPI_Barrier( MPI_COMM_WORLD); /*每个处理器送它的第 i+1 段给处理器 Pi,从而使得第 i 个处理器含有所 有处理器的第 i 段数据 (i=0,…,P-1), ,对应于算法 13.5 步 骤(6)*/ j = 0; for (i=0;i&SumID;i++) { if (i==MyID) { temp1[i] = index[2*i+1]index[2*i]; for (n=0;n&SumID;n++) if (n!=MyID) { k = index[2*n+1]index[2*n]; MPI_Send(&k, sizeof(int), MPI_CHAR, { } MPI_Barrier( { j = 0; k = 0; l = 0; for (i=0;i&SumID;i++) { MPI_Barrier( } } } else { }n, MULTI_LEN +MyID, MPI_COMM _WORLD);MPI_Recv(&temp1[i], sizeof(int), MPI_CHAR, i,MULTI_LEN+i, MPI_COMM_WORLD, &status[j++]);MPI_Barrier(MPI_COMM_WORLD);MPI_COMM_WORLD); if (i==MyID) for (n=index[2*i]; n&index[2*i+1]; n++) arr1[k++] = arr[n];MPI_COMM_WORLD); if (i==MyID) for (n=0;n&SumID;n++) if (n!=MyID) { MPI_Send(&arr[ index[2*n]], sizeof(int)* (index[2*n+18 ]-index[2*n]) ,MPI_CHAR, n, MULTI_TYP E+MyID, MPI_COMM _WORLD); } } else { l = temp1[i]; MPI_Recv(&arr1[k], l*sizeof(int), MPI_CHAR, i ,MULTI_TYPE+i, MPI_COMM_WORLD, &status[j++]); k=k+l; } MPI_Barrier( MPI_COMM_WORLD); } mylength = MPI_Barrier(MPI_COMM_WORLD); /*每个处理器再通过 P 路归并排序将上 一步的到的数据排序;从而这 n 个 数据便是有序的, , 对应于算法 13.5 步骤(7) */ k = 0; multimerge(arr1,temp1,arr,&k,SumID); MPI_Barrier(MPI_COMM_WORLD); } printf(&On node %d the sorted data is : \n&,MyID); for (i=0;i&i++) printf(&%d : &,arr[i]); printf(&\n&); } /*输出错误信息*/merror(char* ch) { printf(&%s\n&,ch); exit(1); } /*串行快速排序算法*/ quicksort(int *datas,int bb,int ee) { int tt,i,j; t = datas[bb]; = j = if (i&j) { while(i&j) { while ((i&j)&&(tt&=datas[j])) j--; if (i&j) { datas[i] = datas[j]; i++; while ((i&j)&& (tt&datas[i])) i++; if (i&j) { datas[j] = datas[i]; j--; if (i==j) datas[i] = } else datas[j] = } else datas[i] = } quicksort(datas,bb,i-1); quicksort(datas,i+1,ee); }9 } else /*串行多路归并算法*/ multimerge(int *data1,int *ind,int *data,int *iter,int SumID) { int i,j,n;data2[i]=data1[m++]; if (m==s2+s1) data2[i]=data1[l++]; else if (data1[l]&data1[m]) data2[i]=data1[m++]; else j = 0; for (i=0;i&SumID;i++) if (ind[i]&0) { ind[j++] = ind[i]; if (j&i+1) ind[i] = 0; } if ( j&1 ) { n = 0; for (i =0;i&j,i+1&j;i=i+2) { merge(&(data1[n]),ind[i], ind[i+1],&(data[n])); ind[i] += ind[i+1]; ind[i+1] = 0; n += ind[i]; } if (j%2==1 ) for (i=0;i&ind[j-1];i++) data[n++]=data1[n]; (*iter)++; multimerge(data,ind,data1,iter, SumID); } } } data2[i]=data1[l++];} merge(int *data1,int s1,int s2,int *data2) { int i,l,m; l = 0; m = s1; for (i=0;i&s1+s2;i++) { if (l==s1)10 2. 运行实例编译:mpicc psrs_sort.c Co psrs 运行:可以使用命令 mpirun Cnp SIZE psrs 来运行该串匹配程序,其中 SIZE 是所使用的处 理器个数,本实例中使用了 SIZE=3 个处理器。 mpirun Cnp 3 psrs 运行结果: The DataSize is : 60 This is node 0 On node 0 the input data is: 0 : 21468 : 9988 : 22117 : 3498 : 16927 : 16045 : 19741 : 12122 : 8410 : 12261 : 27052 : 5659 : 9758 : 21087 : 25875 : 32368 : 26233 : 15212 : 17661 : On node 0 the sorted data is : 0 : 2749 : 3498 : 4086 : 5627 : 5659 : 5758 : 7419 : 8410 : 9084 : 9758 : 9988 : 703 : 908 : 2294 : 9212 : This is node 1 On node 1 the input data is: 16838 : 5758 : 10113 : 17515 : 31051 : 5627 : 23010 : 7419 : 16212 : 4086 : 2749 : 12767 : 9084 : 12060 : 32225 : 17543 : 25089 : 21183 : 25137 : 25566 : On node 1 the sorted data is : 10113 : 12060 : 12122 : 12261 : 12767 : 15212 : 16045 : 16212 : 16838 : 1 : 10239 : 10595 : 12508 : 12914 : 14363 : 16134 : This is node 2 On node 2 the input data is: 908 : 22817 : 10239 : 12914 : 25837 : 27095 : 29976 : 27865 : 20302 : 32531 : 26005 : 31251 : 12508 : 14363 : 10595 : 9212 : 17811 : 16134 : 2294 : 703 : On node 2 the sorted data is : 17543 : 17661 : 19741 : 21087 : 21183 : 21468 : 22117 : 23010 : 25089 : 25137 : 25566 : 25875 : 26233 : 27052 : 31051 : 32225 : 32368 : 17811 : 20302 : 22817 : 25837 : 26005 : 27095 : 27865 : 29976 : 31251 : 32531 : 说明:本程序中通过变量 DataSize 指定了待排序序列的长度为 60,顺序输出各个处理器的 局部数据就可以得到全局有序的序列。11 1. 10快速傅氏变换和离散小波变换长期以来,快速傅氏变换 (Fast Fourier Transform) 和离散小波变换 (Discrete Wavelet Transform)在数字信号处理、石油勘探、地震预报、医学断层诊断、编码理论、量子物理及 概率论等领域中都得到了广泛的应用。各种快速傅氏变换(FFT)和离散小波变换(DWT)算法 不断出现, 成为数值代数方面最活跃的一个研究领域, 而其意义远远超过了算法研究的范围, 进而为诸多科技领域的研究打开了一个崭新的局面。 本章分别对 FFT 和 DWT 的基本算法作 了简单介绍,若需在此方面做进一步研究,可参考文献[2]。1.1 快速傅里叶变换 FFT离散傅里叶变换是 20 世纪 60 年代是计算复杂性研究的主要里程碑之一, 1965 年Cooley 和Tukey所研究的计算离散傅里叶变换(Discrete Fourier Test)的快速傅氏变换(FFT)将计算量 从О(n2)下降至О(nlogn),推进了FFT更深层、更广法的研究与应用。FFT算法有很多版本, 但大体上可分为两类:迭代法和递归法,本节仅讨论迭代法,递归法可参见文献[1]、[2]。1.1.1 串行 FFT 迭代算法假定 a[0],a[1], …,a[n-1] 为一个有限长的输入序列,b[0], b[1], …,b[n-1]为离散傅里叶 变换的结果序列,则有: b[k ] = ∑ a[k ]Wnkm (k = 0,1,2,..., n ? 1) ,其中 W n = em=0 n ?12πi n,实际上,上式可写成矩阵 W 和向量 a 的乘积形式:?b0 ? ? w 0 ?b ? ? w 0 ? 1 ? ? ?b2 ? = ? w 0 ? ? ? ?? ? ?? ?bn ?1 ? ? w 0 ? ? ? w0 w0 ? w0 ? ?a 0 ? ? ?a ? w1 w 2 ? w n ?1 ?? 1 ? ? ?a 2 ? w 2 w 4 ? w 2( n ?1) ?? ? ? ? ? ? ? ?? ? ? ? w ( n ?1) w 2( n ?1) ? w ( n ?1)( n ?1) ? ? ?a n ?1 ?一般的n阶矩阵和n维向量相乘,计算时间复杂度为n2,但由 于W是一种特殊矩阵,故可 以降低计算量。FFT的计算流图如图 22.1 所示,其串行算法如下: 算法 22.1 单处理器上的 FFT 迭代算法 输入:a=(a 0 ,a 1 , …,a n-1 ) 输出:b=(b 0 ,b 1 , …,b n-1 ) Begin (1)for k=0 to n-1 do c k =a k end for (2)for h=logn-1 downto 0 do (2.1) p=2 h (2.2) q=n/p (2.3) z=wq/2 (2.4) for k=0 to n-1 do if (k mod p=k mod2p) then (i)c k = c k + c k +p (ii)c k +p =( c k - c k +p )z k modp end if end for end for (3)for k=1 to n-1 do b r(k) = c k /* r(k)为k的位反 */ end for End/* c k 不用(i)计算的新值 */a0 a1 a2 a3 a4 a5 a6 a70 1 2 3 0 2 0 0 0 2 0 0b0 b4 b2 b6 b1 b5 b3 b7图 22.1 n=4 时的 FFT 蝶式变换图显然, FFT 算法的计算复杂度为 O(nlogn)。1.1.2 并行 FFT 算法设P为处理器的个数,一种并行FFT实现时是将n维向量a划分成p个连续的m维子向量, 这里 m = ?n / p ? ,第i个子向量中下标为i×m, …, (i+1)×m-1,其元素被分配至第i号处理器 (i=0,1, …, p-1) 。 由 图 22.1 可以看到,FFT算法由logn步构成,依次以 2logn-1、2logn-2、…、 (h=logn-1, logn-2, …, 1, 0) 。 2、 1 为下标跨度做蝶式计算, 我们称下标跨度为 2h的计算为第h步 并行计算可分两阶段执行:第一阶段,第logn-1 步至第logm步,由于下标跨度h≥ m,各处 理器之间需要通信;第二阶段,第logm-1 步至第 0 步各处理器之间不需要通信。具体并行 算法框架描述如下: 算法 22.2 FFT 并行算法 输入:a=(a 0 ,a 1 , …,a n-1 ) 输出:b=(b 0 ,b 1 , …,b n-1 ) Begin 对所有处理器 my_rank(my_rank=0,…, p-1)同时执行如下的算法: (1)for h=logp-1 downto 0 do /* 第一阶段,第 logn-1 步至第 logm 步各处理器之间需要通信*/ (1.1) t=2i, ,l=2(i+logm) ,q=n/l , z=wq/2 , j= j+1 ,v=2 j /*开始j=0*/ (1.2)if ((my_rank mod t)=(my_rank mod 2t)) then /*本处理器的数据作为变换的前项数据*/ (i) tt= my_rank+p/v (ii)接收由 tt 号处理器发来的数据块,并将接收的数据块存于 c[my_rank*m+n/v]到 c[my_rank*m+n/v+m]之中 (iii)for k=0 to m-1 do c[k]=c[k]+c[k+n/v] c[k+n/v]=( c[k]- c[k+n/v])*z(my_rank*m+k) mod l end for (iv)将存于 c[my_rank*m+n/v]到 c[my_rank*m+n/v+m]之中的数据块发送 到 tt 号处理器 else /*本处理器的数据作为变换的后项数据*/ (v)将存于之中的数据块发送到 my_rank-p/v 号处理器 (vi)接收由 my_rank-p/v 号处理器发来的数据块存于 c 中 end if end for (2)for i=logm-1 downto 0 do /*第二阶段,第 logm-1 步至第 0 步各处理器之间 不需要通信*/ (2.1) l=2i ,q=n/l , z=wq/2 (2.2)for k=0 to m-1 do if ((k mod l)=(k mod 2l)) then c[k]=c[k]+c[k+l] c[k+l]=( c[k]- c[k+l])*z(my_rank*m+k) mod l end if end for end for End 由于各处理器对其局部存储器中的 m 维子向量做变换计算,计算时间为 m log n ;点对 点通信 2 log p 次,每次通信量为 m,通信时间为 2(t s + mt w ) log p ,因此快速傅里叶变换的并 行计算时间为 T p = m log n + 2(t s + mt w ) log p 。 MPI 源程序请参见章末附录。1.2 离散小波变换 DWT 1.2.1 离散小波变换 DWT 及其串行算法先对一维小波变换作一简单介绍。设f(x)为一维输入信号,记 φ jk ( x) = 2 ? j / 2 φ (2 ? j x ? k ) ,ψ jk ( x) = 2 ? j / 2ψ (2 ? j x ? k ) ,这里 φ ( x) 与 ψ ( x) 分别称为定标函数与子波函数, {φ jk ( x)} 与 记P 0 f=f, 在 第 j 级上的一维离散小波变换DWT(Discrete {ψ jk ( x)} 为二个正交基函数的集合。 Wavelet Transform)通过正交投影P j f与Q j f将P j-1 f分解为:Pj ?1 f = Pj f + Q j f = ∑ c kj φ jk + ∑ d kjψk k jk?1 j j ?1 其中: c kj = ∑ h(n)c 2jk + n , d k = ∑ g ( n )c 2 k + n n =0 n =0p ?1p ?1( j = 1,2,..., L, k = 0,1,..., N 2 j ? 1) ,这里,{h(n)}与{g(n)}分别为低通与高通权系数,它们由基函数 {φ jk ( x)} 与 {ψ0jk( x)} 来确定,p 为权系数的长度。 {C n } 为信号的输入数据,N 为输入信号的长度,L 为所需的级数。由上式可见, 每级一维 DWT 与一维卷积计算很相似。所不同的是:在 DWT 中,输出数据下标增加 1 时, 权系数在输入数据的对应点下标增加 2,这称为“间隔取样” 。 算法 22.3 一维离散小波变换串行算法 输入:c0=d0(c 0 0, c 1 0,…, c N-1 0) h=(h 0 , h 1 ,…, h L-1 ) g=(g 0 , g 1 ,…, g L-1 ) 输出:c i j , d i j (i=0, 1,…, N/2j-1, j≥0) Begin (1)j=0, n=N (2)While (n≥1) do (2.1)for i=0 to n-1 do (2.1.1)c i j+1=0, d i j+1=0 (2.1.2)for k=0 to L-1 doj cij +1 = cij +1 + hk c(k + 2 i) mod n ,d i j +1 = d i j +1 + g k d (jk + 2i ) mod nend for end for (2.2)j=j+1, n=n/2 end while End 显然,算法 22.3 的时间复杂度为 O(N*L)。 在实际应用中,很多情况下采用紧支集小波(Compactly Supported Wavelets) ,这时相 应的尺度系数和小波系数都是有限长度的,不失一般性设尺度系数只有有限个非零值: h 1 ,…,h N ,N为偶数,同样取小波使其只有有限个非零值:g 1 ,…,g N 。为简单起见,设尺度系0 数与小波函数都是实数。对有限长度的输入数据序列: c n = x n , n = 1,2, ?, M (其余点的值都看成 0),它的离散小波变换为:c kj +1 = ∑ c nj hn ? 2 kn∈Zd kj +1 = ∑ c nj g n ? 2 kn∈Z j = 0,1, ? , J ? 1其中J为实际中要求分解的步数,最多不超过log 2 M,其逆变换为c nj ?1 =j ∑ c k hn ?2 k k∈Z+j ∑ c k hn ? 2 k k∈Zj = J ,? ,1注意到尺度系数和输入系列都是有限长度的序列, 上述和实际上都只有有限项。 若完全 按照上述公式计算,在经过 J 步分解后,所得到的 J+1 个序列 d k , j = 0,1, ? , J ? 1 和 c k 的jj非零项的个数之和一般要大于 M,究竟这个项目增加到了多少?下面来分析一下上述计算 过程。 j=0 时计算过程为c1 k = ∑ x n hn ? 2 kn =1M 1 dk = ∑ x n g n?2k n =1M不 难 看 出 , c k 的 非 零 值 范 围 为 : k = ? + 1,?,?1,0,?, ? ?2 2 ?1N k=?M? ? ? 1, 即 有 ?N 1 ? M ? ? M + N ? 1? ?1 + ? ? = ? ? 个非零值。 d k 的非零值范围相同。继续往下分解时,非零项出 2 2 ?2? ? ?现的规律相似。分解多步后非零项的个数可能比输入序列的长度增加较多。例如,若输入序 列长度为 100,N=4,则 d k 有 51 项非零, d k 有 27 项非零, d k 有 15 项非零, d k 有 9 项非 零, d k 有 6 项非零, d k 有 4 项非零, c k 有 4 项非零。这样分解到 6 步后得到的序列的非 零项个数的总和为 116 ,超过了输入序列的长度。在数据压缩等应用中,希望总的长度基本 不增加,这样可以提高压缩比、减少存储量并减少实现的难度。 可以采用稍微改变计算公式的方法, 使输出序列的非零项总和基本上和输入序列的非零 项数相等,并且可以完全重构。这种方法也相当于把输入序列进行延长(增加非零项) ,因 而称为延拓法。 只需考虑一步分解的情形,下面考虑第一步分解(j=1)。将输入序列作延拓,若 M 为偶 数,直接将其按 M 为周期延拓;若 M 为奇数,首先令 x M +1 = 0 。然后按 M+1 为周期延拓。 作了这种延拓后再按前述公式计算,相应的变换矩阵已不再是 H 和 G,事实上这时的变换 矩阵类似于循环矩阵。例如,当 M=8,N=4 时矩阵 H 变为:h3 h1 0 0 h3 h4 h2 0 0 h4 0 h3 h1 0 0 0 h4 h2 0 0 0 0 h3 h1 0 0 0 h4 h2 0 h1 0 0 h3 h1 h2 0 0 h4 h21234566当 M=7,N=4 时矩阵 H 变为: h3 h1 0 0 h3h4 h2 0 0 h40 h3 h1 0 00 h4 h2 0 00 0 h3 h1 00 0 h4 h2 0h1 0 0 h3 h1从上述的矩阵表示可以看出, 两种情况下的矩阵内都有完全相同的行, 这说明作了重复 计算,因而从矩阵中去掉重复的那一行不会减少任何信息量,也就是说,这时我们可以对矩 阵进行截短 (即去掉一行) ,使得所得计算结果仍然可以完全恢复原输入信号。 当 M=8, N=4 时截短后的矩阵为:?h3 ? h H =? 1 ?0 ? ? ?0 h4 h2 0 0 0 h3 h1 0 0 h4 h2 0 0 0 h3 h1 0 0 h4 h2 h1 0 0 h3 h2 ? ? 0? 0? ? h4 ? ?当 M=7,N=4 时截短后的矩阵为:?h3 ? h H =? 1 ?0 ? ?0 ? h4 h2 0 0 0 h3 h1 0 0 h4 h2 0 0 0 h3 h1 0 0 h4 h2 h1 0 0 h3 ? ? ? ? ? ? ?这时的矩阵都只有 ??M ? ? 行。分解过程成为: ?2?C 1 = HC 0 D 1 = GC 0向量C1 和D1都只有 ??M ? ? 个元素。重构过程为: ?2?C 0 = H * C 1 + G * D1可以完全重构。矩阵 H,G 有等式 H*H+G*G=I 一般情况下,按上述方式保留矩阵的 ??M ? ? 行,可以完全恢复原信号。 ?2?这种方法的优点是最后的序列的非 0 元素的个数基本上和输入序列的非 0 元素个数相 同,特别是若输入序列长度为 2 的幂,则完全相同,而且可以完全重构输入信号。其代价是 得到的变换系数Dj中的一些元素已不再是输入序列的离散小波变换系数, 对某些应用可能是 不适合的,但在数据压缩等应用领域,这种方法是可行的。1.2.2 离散小波变换并行算法下设输入序列长度N=2t,不失一般性设尺度系数只有有限个非零值:h 0 ,…,h L-1 ,L 为偶数,同样取小波使其只有有限个非零值:g 0 ,…,g L-1 。为简单起见,我们采用的延拓 0 = x , ( n = 0,1, ? , N ? 1) 按周期N延长, 方法计算。 即将有限尺度的序列 c n 使他成为无限长度的序 n 列。这时变换公式也称为周期小波变换。变换公式为:c kj +1 =n∈Z∑ c nj hn?2k = ∑ hn c j n+ 2kn =0L ?1 d kj +1 = ∑ g n d jn =0L ?1n+2kj = 0,1, ? , J ? 1其中&n+2k&表示n+2k对于模N/2j的最小非负剩余。注意这时 c k 和 d k 是周期为N/2j的周 期序列。其逆变换为c nj ?1 =k∈Zjj∑ ckj hn?2k + ∑ ckj g n?2kk∈Zj = J ,? ,1从变换公式中可以看出, 计算输出点 c kj +1 和 d kj +1 , 需要输入序列 c nj 在 n=2k 附近的值 (一 般而言, L 远远小于输入序列的长度) 。 设处理器台数为 p, 将输入数据 c nj (n = 0,1, ? , N / 2 j ? 1) 按块分配给 p 台处理器, 处理器 i 得到数据 c nj (n = i 和 d nj +1 (n = iN N 让处理器 i 负责 c nj +1 , ? , (i + 1) ? 1) , P2 j P2 jN N ? 1) 的计算,则不难看出,处理器 i 基本上只要用到局部数据, , ? , (i + 1) P 2 j +1 P 2 j +1只有 L/2 个点的计算要用到处理器 i+1 中的数据,这时做一步并行数据发送:将处理器 i+1 中前 L-1 个数据发送给处理器 i,则各处理器的计算不再需要数据交换,关于本算法其它描 述可参见文献[1]。 算法 22.4 离散小波变换并行算法 输入:h i (i=0,…, L-1), g i (i=0,…, L-1), c i 0(i=0,…, N-1) 输出:c i k (i=0,…, N/2k-1,k&0) Begin 对所有处理器 my_rank(my_rank=0,…, p-1)同时执行如下的算法: (1)j=0; (2)while (j&r) do (2.1)将数据 c nj (n = 0,1, ? , N / 2 j ? 1) 按块分配给 p 台处理器 (2.2)将处理器 i+1 中前 L-1 个数据发送给处理器 i (2.3)处理器 i 负责 c n (2.4)j=j+1 end while End 这里每一步分解后数据 c nj +1 和 d nj +1 已经是按块存储在 P 台处理器上,因此算法第一步 中的数据分配除了 j=0 时需要数据传送外,其余各步不需要数据传送(数据已经到位) 。因 此,按 LogP 模型,算法的总的通信时间为:2(Lmax(o,g)+l),远小于计算时间 O(N)。 MPI 源程序请参见所附光盘。j +1和 d nj +1 (n = iN N ,? , (i + 1) ? 1) 的计算 j +1 p2 p 2 j +1 1.3 小结本章主要讨论一维 FFT 和 DWT 的简单串、并行算法,二维 FFT 和 DWT 在光学、地震 以及图象信号处理等方面起着重要的作用。限于篇幅,此处不再予以讨论。同样,FFT 和 DWT 的并行算法的更为详尽描述可参见文献[2]和文献[3],专门介绍快速傅氏变换和卷积算 法的著作可参见[4]。另外,二维小波变换的并行计算及相关算法可参见文献[5],LogP 模型 可参考文献[3]。参考文献[1]. [2]. [3]. [4]. 王能超 著.数值算法设计.华中理工大学出版社,1988.9 陈国良 编著.并行计算――结构?算法?编程.高等教育出版社,1999.10 陈国良 编著.并行算法设计与分析(修订版) .高等教育出版社,2002.11 Nussbaumer H J. Fast Fourier Transform and Convolution Algorithms.2nded. SpringerVerlag,1982 [5]. 陈.二维正交子波变换的 VLSI 并行计算.电子学报,):95-97附录 FFT 并行算法的 MPI 源程序1. 源程序 fft.c#include &stdio.h& #include &stdlib.h& #include &complex.h& #include &math.h& #include &mpi.h& #define #define #define MAX_PROCESSOR_NUM 12 MAX_N EPS 50 3.E-8 99 100 101 102 103 104 int main(int argc, char *argv[]) { complex&double& p[MAX_N], q[MAX_N], s[2*MAX_N], r[2*MAX_N]; complex&double& w[2*MAX_N]; complex&double& int variableN void evaluate(complex&double&* f, int beginPos, int endPos, const complex&double&* x, complex&double& *y, int leftPos, int rightPos, int totalLength); MPI_S int rank, int i, j, k, int wL void readDoubleComplex(FILE *f, complex&double& &z); void print(const complex&double&* f, int fLength); void shuffle(complex&double&* f, int beginPos, int endPos);#define PI #define V_TAG #define P_TAG #define #define #define #define Q_TAG R_TAG S_TAG S_TAG2 int everageL int moreL int startP int stopP FILE *printf(&p(t) = &); print(p, variableNum); printf(&q(t) = &); print(q, variableNum); for(i = 1; i & i ++)MPI_Init(&argc, &argv); MPI_Get_rank(MPI_COMM_WORLD, &rank); MPI_Get_size(MPI_COMM_WORLD, &size); if(rank == 0) { fin = fopen(&dataIn.txt&, &r&); if (fin == NULL) { puts(&Not find input data file&); puts(&Please create a file \&dataIn.txt\&&); puts(&&example for dataIn.txt& &); puts(&2&); puts(&1.0 puts(&2.0 exit(-1); } readDoubleComplex(fin, variableNum); if ((variableNum & 1)||(variableNum & MAX_N)) { puts(&variableNum out of range!&); exit(-1); } for(i = 0; i & variableN i ++) readDoubleComplex(fin, p[i]); for(i = 0; i & variableN i ++) readDoubleComplex(fin, q[i]); fclose(fin); puts(&Read from data file \&dataIn.txt\&&); } } 2&); -1&); } else {{ MPI_Send(&variableNum,1, MPI_INT,i, V_TAG, MPI_COMM_WORLD); MPI_Send(p,variableNum, MPI_DOUBLE_COMPLEX,i, P_TAG, PI_COMM_WORLD); MPI_Send(q,variableNum, MPI_DOUBLE_COMPLEX,i, Q_TAG, MPI_COMM_WORLD); }MPI_Recv(&variableNum,1,MPI_INT,0, V_TAG,MPI_COMM_WORLD, &status); MPI_Recv(p,variableNum, MPI_DOUBLE_COMPLEX,0, P_TAG, PI_COMM_WORLD, &status); MPI_Recv(q,variableNum, MPI_DOUBLE_COMPLEX,0, Q_TAG,MPI_COMM_WORLD, &status);wLength = 2*variableN for(i = 0; i & wL i ++) { w[i]= complex&double& (cos(i*2*PI/wLength), sin(i*2*PI/wLength));everageLength = wLength / moreLength = wLength % startPos = moreLength + rank * everageL stopPos = startPos + everageLength - 1; } if(rank == 0) { startPos = 0; stopPos = moreLength+everageLength 1; } evaluate(p, 0, variableNum - 1, w, s, startPos, stopPos, wLength); evaluate(q, 0, variableNum - 1, w, r, startPos, stopPos, wLength); for(i = startP i &= stopP i ++) s[i] = s[i]*r[i]/(wLength*1.0); if (rank & 0) { MPI_Send((s+startPos), everageLength, MPI_DOUBLE_COMPLEX, 0, S_TAG, MPI_COMM_WORLD); MPI_Recv(s,wLength, MPI_DOUBLE_COMPLEX,0, S_ TAG2,MPI_ COMM_WORLD, &status); } else { for(i = 1; i & i ++) { MPI_Recv((s+moreLength+i* everageLength),everageLength, MPI_DOUBLE_COMPLEX, i,S_TAG, MPI_COMM_WORLD, &status); } } for(i = 1; i & i ++) { MPI_Send(s,wLength, MPI_DOUBLE_COMPLEX,i, } MPI_Finalize(); } } else { if (rank & 0) { } }S_TAG2, MPI_COM M_WORLD);for(int i = 1; i & wLength/2; i ++) { temp = w[i]; w[i] = w[wLength - i]; w[wLength - i] = evaluate(s, 0, wLength - 1, w, r, startPos, stopPos, wLength);MPI_Send((r+startPos), everageLength, MPI_DOUBLE_COMPLEX,0, R_TAG, MPI_COMM_WORLD);for(i = 1; i & i ++) { MPI_Recv((r+moreLength+i* everageLength), everageLength, MPI_DOUBLE_COMPLEX, i, R_TAG, MPI_COMM_WORLD, &status);puts(&\nAfter FFT r(t)=p(t)q(t)&); printf(&r(t) = &); print(r, wLength - 1); puts(&&); printf(&Use prossor size = %d\n&,size); void evaluate(complex&double&* f, int beginPos, int endPos, const complex&double&* x, complex&double&* y, int leftPos, int rightPos, int totalLength) { complex&double& tempX[2*MAX_N],tempY1[2*MAX_N], tempY2[2*MAX_N]; int midPos = (beginPos + endPos)/2; if ((beginPos & endPos)||(leftPos & rightPos)) { puts(&Error in use Polynomial!&); exit(-1); } else if(beginPos == endPos) { for(i = leftP i &= rightP i ++) y[i] = f[beginPos]; } else if(beginPos + 1 == endPos) { for(i = leftP i &= rightP i ++) y[i] = f[beginPos] + f[endPos]*x[i]; } else { shuffle(f, beginPos, endPos); for(i = leftP i &= rightP i ++) tempX[i] = x[i]*x[i]; evaluate(f, beginPos, midPos, tempX, tempY1, leftPos, rightPos,totalLength); evaluate(f, midPos+1, endPos, tempX, tempY2, leftPos, rightPos, totalLength); for(i = leftP i &= rightP i ++) y[i] = tempY1[i] + x[i]*tempY2[i]; } } void shuffle(complex&double&* f, int beginPos, int endPos) { }complex&double& temp[2*MAX_N]; int i, for(i = beginP i &= endP i ++) temp[i] = f[i]; j = beginP for(i = beginP i &= endP i +=2) { f[j] = temp[i]; j ++; } for(i = beginPos +1; i &= endP i += 2) { f[j] = temp[i]; j ++; }void print(const complex&double&* f, int fLength) { bool isPrint = if (abs(f[0].real()) & EPS) { printf(“%lf”, f[0].real()); isPrint = } for(i = 1; i & fL i ++) { if (f[i].real() & EPS) { if (isPrint) printf(& + &); else isPrint = printf(&%lft^%d&, f[i].real(),i); } else if (f[i].real() & - EPS) { if(isPrint) printf(& - &); else isPrint = printf(&%lft^%d&, -f[i].real(),i); } } }if (isPrint == false) printf(&0&); printf(&\n&);2. 运行实例编译:mpicc Co fft fft.cc 运行:使用如下命令运行程序 mpirun Cnp 1 fft mpirun Cnp 2 fft mpirun Cnp 3 fft mpirun Cnp 4 fft mpirun Cnp 5 fft 运行结果:Input of file &dataIn.txt& 4 1 0 3 1 3 1 2 1Output of solution Read from data file &dataIn.txt& p(t) = 1 + 3t^1 + 3t^2 + 1t^3 q(t) = 1t^1 + 2t^2 + 1t^3 After FFT r(t)=p(t)q(t) r(t) = 1t^1 + 5t^2 + 10t^3 + 10t^4 + 5t^5 + 1t^6 Use prossor size = 1 End of this running Read from data file &dataIn.txt& p(t) = 1 + 3t^1 + 3t^2 + 1t^3 q(t) = 1t^1 + 2t^2 + 1t^3 After FFT r(t)=p(t)q(t) r(t) = 1t^1 + 5t^2 + 10t^3 + 10t^4 + 5t^5 + 1t^6 Use prossor size = 2 End of this running Read from data file &dataIn.txt& p(t) = 1 + 3t^1 + 3t^2 + 1t^3 q(t) = 1t^1 + 2t^2 + 1t^3 After FFT r(t)=p(t)q(t) r(t) = 1t^1 + 5t^2 + 10t^3 + 10t^4 + 5t^5 + 1t^6 Use prossor size = 3 End of this running Read from data file &dataIn.txt& p(t) = 1 + 3t^1 + 3t^2 + 1t^3 q(t) = 1t^1 + 2t^2 + 1t^3 After FFT r(t)=p(t)q(t) r(t) = 1t^1 + 5t^2 + 10t^3 + 10t^4 + 5t^5 + 1t^6 Use prossor size = 4 End of this running Read from data file &dataIn.txt& p(t) = 1 + 3t^1 + 3t^2 + 1t^3 q(t) = 1t^1 + 2t^2 + 1t^3 After FFT r(t)=p(t)q(t) r(t) = 1t^1 + 5t^2 + 10t^3 + 10t^4 + 5t^5 + 1t^6 Use prossor size = 5 End of this running说明:运行中可以使用参数 ProcessSize,如 mpirun Cnp ProcessSize fft 来运行该程序,其 中 ProcessSize 是所使用的处理器个数, 本实例中依次取 1、2、3、4、5 个处理器分别进行计 算。 1. 2串匹配串匹配(String Matching)问题是计算机科学中的一个基本问题,也是复杂性理论中研 究的最广泛的问题之一。它在文字编辑处理、图像处理、文献检索、自然语言识别、生物学 等领域有着广泛的应用。而且,串匹配是这些应用中最耗时的核心问题,好的串匹配算法能 显著地提高应用的效率。 因此, 研究并设计快速的串匹配算法具有重要的理论价值和实际意 义。 串匹配问题实际上就是一种模式匹配问题, 即在给定的文本串中找出与模式串匹配的子 串的起始位置。最基本的串匹配问题是关键词匹配(Keyword Matching) 。所谓关键词匹配, 是指给定一个长为 n 的文本串 T[1,n]和长为 m 的模式串 P[1,m],找出文本串 T 中与模式 串所有精确匹配的子串的起始位置。串匹配问题包括精确串匹配(Perfect String Matching) 、 随机串匹配(Randomized String Matching)和近似串匹配(Approximate String Matching) 。 另外还有多维串匹配(Multidimensional String Matching)和硬件串匹配(Hardware String Matching)等。 本章中分别介绍改进的 KMP 串匹配算法,采用散列技术的随机串匹配算法,基于过滤 算法的近似串匹配算法,以及它们的 MPI 编程实现。1.1 KMP 串匹配算法1.1.1 KMP 串匹配及其串行算法KMP 算法首先是由 D.E. Knuth、J.H. Morris 以及 V.R. Pratt 分别设计出来的,所以该算 法被命名为 KMP 算法。KMP 串匹配算的基本思想是:对给出的的文本串 T[1,n]与模式 串 P[1,m],假设在模式匹配的进程中,执行 T[i]和 P[j]的匹配检查。 若 T[i]=P[j],则继续 检查 T[i+1]和 P[j+1]是否匹配。若 T[i]≠P[j],则分成两种情况:若 j=1,则模式串右移一位, 检查 T[i+1]和 P[1]是否匹配;若 1&j≤m,则模式串右移 j-next(j)位,检查 T[i]和 P[next(j)] 是否匹配(其中 next 是根据模式串 P[1,m]的本身局部匹配的信息构造而成的) 。重复此过 程直到 j=m 或 i=n 结束。1. 修改的 KMP 算法在原算法基础上很多学者作了一些改进工作,其中之一就是重新定义了 KMP 算法中的 next 函数,即求 next 函数时不但要求 P[1,next(j) -1]=P[j-(next(j) -1),j-1],而且要求 P[next(j)] ≠P[j],记修改后的 next 函数为 newnext。在模式串字符重复高的情况下修改的 KMP 算法比传统 KMP 算法更加有效。 算法 14.1 修改的 KMP 串匹配算法 输入:文本串 T[1,n]和模式串 P[1,m] 输出:是否存在匹配位置 procedure modeifed_KMP Begin (1) i=1,j=1 (2) while i≤n do (2.1) while j≠0 and P[j]≠T[i] do1 j=newnext[j] end while (2.2)if j=m then return true else j=j+1,i=i+1 end if end while (3) return false End 算法 14.2 next 函数和 newnext 函数的计算算法 输入:模式串 P[1,m] 输出:next[1,m+1]和 newnext[1,m] procedure next Begin (1) next[1]=0 (2) j=2 (3) while j≤m do (3.1)i=next[j-1] (3.2)while i≠0 and P[i]≠P[j-1] do i=next[i] end while (3.3)next[j]=i+1 (3.4)j=j+1 end while End procedure newnext Begin (1) newnext(1)=0 (2) j=2 (3) while j≤m do (3.1)i=next(j) (3.2)if i=0 or P[j]≠P[i+1] then newnext[j]=i else newnext[j]=newnext[i] end if (3.3)j=j+1 end while End2 2. 改进的 KMP 算法易知算法 14.1 的时间复杂度为 O(n) ,算 法 14.2 的时间复杂度为 O(m) 。 算 法 14.1 中所给出的 KMP 算法只能找到第一个匹配位置, 实际应用中往往需要找出所有的匹配位置。 下面给出改进后的算法 14.3 便解决了这一问题。 算法 14.3 改进 KMP 串匹配算法 输入:文本串 T[1,n]和模式串 P[1,m] 输出:匹配结果 match[1,n] procedure improved_KMP Begin (1) i=1,j=1 (2) while i≤n do (2.1)while j≠0 and P[j]≠T[i] do j=newnext[j] end while (2.2)if j=m then match[i-(m-1)]=1 j=next[m+1] i=i+1 else i=i+1 j=j+1 end if end while (3) max_prefix_len=j-1 End 算法 14.4 next 函数和 newnext 函数的计算算法 输入:模式串 P[1,m] 输出:next[1,m+1]和 newnext[1,m] procedure NEXT Begin (1) next[1]=newnext[1]=0 (2) j=2 (3) while j≤m+1 do (3.1)i=next[j-1] (3.2)while i≠0 and W[i]≠W[j-1]) do i=next[i] end while (3.3)next[j]=i+1 (3.4)if j≠m+1 then if W[j] ≠W[i+1] then newnext[j]=i+1 else newnext[j]=newnext[i+1]3 end if end if (3.5)j=j+1 end while End 在算法 14.3 中, 内层 while 循环遇到成功比较时和找到文本串中模式串的一个匹配位置 时文本串指针 i 均加 1,所以至多做 n 次比较;内 while 循环每次不成功比较时文本串指针 i 保持不变,但是模式串指针 j 减小,所以 i-j 的值增加且上一次出内循环时的 i-j 值等于下 一次进入时的值,因此不成功的比较次数不大于 i-j 的值的增加值,即小于 n,所以总的比 较次数小于 2n,所以算法 14.3 的时间复杂度为 O(n) 。算法 14.4 只比的算法 14.2 多计算了 next(m+1),至多多做 m-1 次比较,所以算法 14.4 的时间复杂度同样为 O(m) 。1.1.2 KMP 串匹配的并行算法 1. 算法原理在介绍了改进的 KMP 串行算法基础上,这一节主要介绍如何在分布存储环境中对它进 行实现。设计的思路为:将长为 n 的文本串 T 均匀划分成互不重叠的 p 段,分布于处理器 0 到 p-1 中,且使得相邻的文本段分布在相邻的处理器中,显然每个处理器中局部文本段的长 度为 ?n / p ? (最后一个处理器可在其段尾补上其它特殊字符使其长度与其它相同) 。再将长 为 m 的模式串 P 和模式串的 newnext 函数播送到各处理器中。 各处理器使用改进的 KMP 算 法并行地对局部文本段进行匹配,找到所有段内匹配位置。 但是每个局部段(第p-1 段除外)段尾m-1 字符中的匹配位置必须跨段才能找到。一个 简单易行的办法就是每个处理器(处理器p-1 除外)将本局部段的段尾m-1 个字符传送给下 一处理器,下一处理器接收到前一处理器传来的字符串后,再接合本段的段首m-1 个字符构 成一长为 2(m-1)的段间字符串,对此字符串做匹配,就能找到所有段间匹配位置。但是算法 的通信量很大, 采用下述两种改进通信的方法可以大大地降低通信复杂度: ①降低播送模式 串和newnext函数的通信复杂度。利用串的周期性质,先对模式串P作预处理,获得其最小周 期长度|U|、最小周期个数s及后缀长度|V|(P=UsV) ,只需播送U,s,|V|和部分newnext函数 就可以了,从而大大减少了播送模式串和newnext函数的通信量。而且串的最小周期和next 函数之间的关系存在着下面定理 1 所示的简单规律, 使得能够设计出常数时间复杂度的串周 期分析算法。②降低每个处理器(处理器p-1 除外)将本局部文本段的段尾m-1 个字符传送 给下一处理器的通信复杂度。 每个处理器在其段尾m-1 个字符中找到模式串P的最长前缀串, 因为每个处理器上都有模式串信息, 所以只需传送该最长前缀串的长度就行了。 这样就把通 信量从传送模式串P的最长前缀串降低到传送一个整数,从而大大地降低了通信复杂度。而 且KMP算法结束时模式串指针j-1 的值就是文本串尾模式串最大前缀串的长度,这就可以在 不增加时间复杂度的情况下找到此最大前缀串的长度。2. 串的周期性分析定义 14.1 :给定串 P ,如果存在字符串 U 以及正整数 K ≥ 2 ,使得串 P 是串 Uk 的前缀 (Prefix) ,则称字符串U为串P的周期(Period) 。字 符 串P的所有周期中长度最短的周期称为 串P的最小周期(Miximal Period) 。 串的周期分析对最终并行算法设计非常关键,串的最小周期和 next 函数之间的关系存 在着如下定理 14.1 所示的简单规律,基于该规律可以设计出常数时间复杂度的串周期分析4 算法。 定理 14.1:已知串 P 长为 m,则 u=m+1-next(m+1)为串 P 的最小周期长度。 算法 14.5 串周期分析算法 输入:next[m+1] 输出:最小周期长度 period_len 最小周期个数 period_num 模式串的后缀长度 pattern-suffixlen procedure PERIOD_ANALYSIS Begin period_len=m+1-next(m+1) period_num=(int)m/period_len pattern_suffixlen=m mod period_len End3. 并行算法描述在前述的串行算法以及对其并行实现计的分析的基础上,我们可以设计如下的并行 KMP 串匹配算法。 算法 14.6 并行 KMP 串匹配算法 输入:分布存储的文本串T i [1, ?n / p ? ](i=0,1,2,…,p-1) 存储于P 0 的模式串P[1,m] 输出:所有的匹配位置 Begin (1) P 0 call procedure NEXT /*调用算法 14.4,求模式串P的 next 函数和 newnext 函数*/ P 0 call procedure PERIOD_ANALYSIS /*调用算法 14.5 分析P的周期*/ (2) P 0 broadcast period_len,period_num,period_suffixlen to other processors /*播送 P 之最小周期长度、最小周期个数和后缀长度*/ P 0 broadcast P[1,period_len] /*不播送P之最小周期*/ if period_num=1 then /*播送 P 之部分 newnext 函数,如周期为 1,则播送整个 newnext 函数;否则播送 2 倍周期长的 newnext 函数*/ broadcast newnext [1,m] else broadcast newnext[1,2*period_len] end if (3) for i=1 to p-1 par-do /*由传送来的 P 之周期和部分 newnext 函数重构整个模式串 和 newnext 函数*/ P i rebuild newnext end for for i=1 to p-1 par-do /*调用算法 14.7 做局部段匹配, 并获得局部段尾最大前缀串 之长度*/ P i call procedure KMP(T,P ,n ,0,match) end for (4) for i =0 to p-2 par-do5 P i send maxprefixlen to P i+1 end for for i=1 to p-1 par-do P i receive maxprefixlen from P i-1 P i call procedure KMP(Ti,P,m-1, maxprefixlen,match+m-1) /*调用 算法 14.7 做段间匹配*/ end for End 该 算 法 中 调 用 的 KMP 算 法 必 须 重 新 修 改 如 下 , 因 为 做 段 间 匹 配 时 已 经 匹 配 了 maxprefixlen 长度的字符串,所以模式串指针只需从 maxprefixlen+1 处开始。 算法 14.7 重新修改的 KMP 算法 输入:分布存储的文本串和存储于P 0 的模式串P[1,m] 输出:所有的匹配位置 procedure KMP(T,P,textlen, matched_num,match) Begin (1) i=1 (2) j=matched_num+1 (3) while i≤textlen do (3.1)while j≠0 and P[j]≠T[i] do j=newnext[j] end while (3.2)if j=m then match[i-(m-1)]=1 j=next[m+1] i=i+1 else j=j+1 i=i+1 end if end while (4) maxprefixlen=j-1 End 下面从计算时间复杂度和通信时间复杂度两个方面来分析该算法的时间复杂度。 在分析 计算时间复杂度时,假定文本串初始已经分布在各个处理器上,这是合理的,因为文本串一 般很大, 不会有大的变化, 只需要分布一次就可以, 同时也假定结果分布在各处理器上。 本 算法的计算时间由算法步(1)中所调用的算法 14.4 的时间复杂度 O(m)和算法 14.5 的时 间复杂度 O(1) ;算法步(3)和算法步(4)所调用的改进的 KMP 算法 14.7 的时间复杂度 O(n/p)和 O(m)构成。所以本算法总的计算时间复杂度为 O(n/p+m) 。通信复杂度由算 法步(2)播送模式串信息(最小周期串 U 及最小周期长度、周期个数和后缀长度三个整数) 和 newnext 函数(长度为 2u 的整数数组,u 为串 U 的长度)以及算法步(4)传送最大前缀 串长度组成,所以通信复杂度与具体采用的播送算法有关。若采用二分树通信技术,则总的 通信复杂度为 O(ulogp) 。 MPI 源程序请参见章末附录。6 1.2 随机串匹配算法1.2.1 随机串匹配及其串行算法采用上一节所述的 KMP 算法虽然能够找到所有的匹配位置,但是算法的复杂度十分高, 在某些领域并不实用。本节给出了随机串匹配算法,该算法的主要采用了散列(Hash)技术 的思想, 它能提供对数的时间复杂度。 其基本思想是: 为了处理模式长度为 m 的串匹配问题, 可以将任意长为 m 的串映射到 O(logm)整数位上,映射方法须得保证两个不同的串映射到同 一整数的概率非常小。所得到的整数之被视为该串的指纹(Fingerprint) ,如果两个串的指 纹相同则可以判断两个串相匹配。1. 指纹函数本节中假定文本串和模式串取字符集∑={0,1}中的字母。 如上所述,随机串匹配算法的基本策略是将串映射到某些小的整数上。令 T 是长度为 n 的文本串,P 是长度为 m≤n 的模式串,匹配的目的就是识别 P 在 T 中出现的所有位置。考 虑长度为 m 的 T 的所有子串集合 B。这样的起始在位置 i(1≤i≤n-m+1)的子串共有 n-m+1 个。 于是问题可重新阐述为去识别与 P 相同的 B 中元素的问题。 该算法中最重要的是如何选 择一个合适的映射函数(Mapping Function) ,下面将对此进行简单的讨论。 令 F = { f p } p∈s 是函数集,使得 f p 将长度为 m 的串映射到域 D,且要求集合 F 满足下述 ,f p ( X ) 由 O (logm) 三个性质: ①对于任意串 X∈B 以及每一个 p∈S (S 为模式串的取值域) 位组成;②随机选择 f p ∈ F ,它将两个不等的串 X 和 Y 映射到 D 中同一元素的概率是很小 的;③对于每个 p∈S 和 B 中所有串,应该能够很容易的并行计算 f p ( X ) 。 其中 f p ( X ) 上述三个性质中, 性质①隐含着 f p ( X ) 和 f p ( P ) 可以在 O(1)时间内比较, 被称为串 X 的指纹;性质②是说,如果两个串 X 和 Y 的指纹相等,则 X=Y 的概率很高;性质 ③主要是针对该算法的并行实现的需求对集合 F 加以限制。对于串行算法函数 f p 只需要满 足前两个性质即可。 本节中我们采用了这样一类指纹函数:将取值{0,1}上的串 X 集合映射到取值整数环 Z 上的 2 × 2 矩阵。令 f (0) 和 f (1) 定义如下:?1 0? f (0) = ? ?, ?1 1??1 1? f (1) = ? ? ?0 1?该函数目前只满足性质 2,为了使其同时满足性质 1 需要对该函数作如下更改: 令p是区间[1,2,…,M]中的一个素数,其中M是一个待指定的整数;令Z p 是取模p的整 我们定义 f p ( X ) 为 f ( X ) mod p , 即 f p ( X ) 是一个 2 × 2 的矩阵, 数环。 对于每个这样的p, 使得 f p ( X ) 的(i,j)项等于 f ( X ) 的相应项取模p。7 由此构造的函数 f p 同时满足前述性质 1 和 2。2. 串行随机串匹配算法用上面定义的指纹函数可以构造一个随机串匹配算法。先计算模式串 P(1,m)和子串 T(i,i+m-1)的指纹函数(其中 1≤i≤n-m+1,m≤n) ,然后每当 P 的指纹和 T(i,i+m-1) 的指纹相等时,则标记在文本 T 的位置 i 与 P 出现匹配。算法描述如下: 算法 14.8 串行随机串匹配算法 输入:数组 T(1,n)和 P(1,m),整数 M。 输出:数组 MATCH,其分量指明 P 在 T 中出现的匹配位置。 Begin (1) for i=1 to n-m+1 do MATCH(i)=0 end for (2) 在区间[1,2,…,M]中随机的选择一素数,并计算 f p ( P ) (3) for i=1 to n-m+1 do Li= f p (T (i, i + m ? 1)) end for (4) for i=1 to n-m+1 do if Li= f p ( P ) then MATCH(i)=1 end if end for End 很显然在该算法中步骤(1)和(4)的时间复杂度均为 O(n-m) ;步骤(2)和(3)中 求模式串和文本串各个子串的指纹的时间复杂度分别 O(m)和 O(n-m) 。1.2.2 随机串匹配的并行算法对上述串行算法的并行化主要是针对算法 14.7 中步骤(3)的并行处理,也就是说需要 选择一个合适的函数 f p 使其同时满足上述三个性质。前面一节给出了同时满足前两个性质 的函数,这里我们主要针对性质 3 来讨论已得指纹函数类 F。 函数类 F = { f p } 的一个关键性质就是每个 f p 都是同态(Homomorphic)的,即对于任 意两个串 X 和 Y, f P ( XY ) = f p ( X ) f p (Y ) 。下面给出了一个有效的并行算法来计算文本串T 中所有子串的指纹。对 于 每 个 1 ≤ i ≤ n-m+1 , 令N i = f p (T (1, i )),易得N i = f p (T (1)) f p (T (2)) ? f p (T (i )) 。因为矩阵乘法是可结合的,所以此计算就是一种前缀8 和的计算;同时每个矩阵的大小为 2 × 2 ,因此这样的矩阵乘法可以在O(1)时间内完成。 所以,所有的N i 都可以在O(logn)时间内,总共使用O(n)次操作计算。 定 义 14.2 : g p (0) = f p (0)?10? ? 1 ?1 =? , g p (1) = f p (1) ?1 = ? ? ? p ? 1 1? ?0p ? 1? 。则乘积 1 ? ?Ri = g p (T (i )) g p (T (i ? 1)) ? g p (T (1)) 也是一个前缀和计算,并且对于 1 ≤ i ≤ n ,它可以在 O(logn)时间内运用 O(n)次操作计算。 容易看到, f p (T (i, i + m ? 1)) = Ri ?1 N i + m ?1 ,因此,一旦所有的R i 和N i 都计算出来了, 则每个这样的矩阵均可在O(1)时间内计算之。所以,长度为n的正文串T中所有长度为m≤n 的子串的指纹均可在O(logn)时间内使用O(n)次操作而计算之。 这样,函数 f p 同时满足了前述三个性质。在此基础上我们给出了在分布式存储体系结 构上随机串匹配的并行算法。 算法 14.9 并行随机串匹配算法 输入:数组 T(1,n)和 P(1,m),整数 M。 输出:数组 MATCH,其分量指明 P 在 T 中出现的位置。 Begin (1) for i=1 to n-m+1 par-do MATCH(i)=0; end for (2) Select a random prime number in [1,2,…,M],then count f p ( P ) 。 (3) for i=1 to n-m+1 par-do Li= f p (T (i, i + m ? 1)) ; end for (4) for i=1 to n-m+1 par-do if Li= f p ( P ) then MATCH(i)=1 end if end for End 该并行算法的计算复杂度为 O(logn) 。处理期间的通信包括在对文本串到各个处理器 的分派,其通信复杂度为 O(n/p+m) ;以及匹配信息的收集,其通信复杂度为 O(n/p) 。 MPI 源程序请参见所附光盘。1.3 近似串匹配算法1.3.1 近似串匹配及其串行算法前两节所讨论的串匹配算法均属于精确串匹配技术, 它要求模式串与文本串的子串完全9 匹配,不允许有错误。然而在许多实际情况中,并不要求模式串与文本串的子串完全精确地 匹配,因为模式串和文本串都有可能并不是完全准确的。例如,在检索文本时,文本中可能 存在一些拼写错误, 而待检索的关键字也可能存在输入或拼写错误。 在这种情况下的串匹配 问题就是近似串匹配问题。 近似串匹配问题主要是指按照一定的近似标准, 在文本串中找出所有与模式串近似匹配 的子串。近似串匹配问题的算法有很多,按照研究方法的不同大致分为动态规划算法,有限 自动机算法,过滤算法等。但上述所有算法都是针对一般的近似串匹配问题,也就是只允许 有插入、 删除、 替换这三种操作的情况。 本节中还考虑了另外一种很常见的错误―换位, 即, 文本串或模式串中相邻两字符的位置发生了交换, 这是在手写和用键盘进行输入时经常会发 生的一类错误。为修正这类错误引入了换位操作,讨论了允许有插入、删除、替换和换位四 种操作的近似串匹配问题。1. 问题描述:给定两个长度分别为m和n的字符串A[1,m]和B[1,n],a i ,b j ∈∑(1≤i≤m,1≤j≤n, ∑是字母表),串A与串B的编辑距离(Editor Distance)是指将串A变成串B所需要的最少编 辑操作的个数。最常见的编辑操作有三种:①插入(Insert),向串A中插入一个字符;②删 除(Delete),从串 A中删除一个字符;③替换(Replace),将串 A中的一个字符替换成串B中的 一个字符。 其中, 每个编辑操作修正的串A与串B之间的一个不同之处称为一个误差或者错误。 最多带k个误差的串匹配问题(简称为k-differences问题)可描述如下:给定一个长 度为n的文本串T=t 1 …t n ,一个长度为m的模式串P=p 1 …p m ,以及整数k(k≥1) ,在文本串T 中进行匹配查找,如果T的某个子串t i …t j 与模式串P的编辑距离小于等于k,则称在t j 处匹 配成功,要求找出所有这样的匹配结束位置t j 。 另外一种很常见的编辑操作是换位(Exchange): 将串 A 中的两个相邻字符进行交换。 该 操作用于修正相邻两字符位置互换的错误。 一个换位操作相当于一个插入操作加上一个删除 操作。但是,当近似匹配允许的最大误差数 k 一定时,允许有换位操作的情况较之不允许有 换位操作的情况,往往能够找出更多的匹配位置。 例如,假定文本串 T=abcdefghij,模式串 P=bxcegfhy,k=4,问在文本串的第 9 个字符 处是否存在最多带 4 个误差的匹配? 首先考虑允许有换位操作的情况,文本串与模式串的对应关系如下: t1 t2 t3 t4 t5 t6 t7 t8 t 9 t 10 T: a b c d e f g h i j P: b x c e g f h y ① ② ③ ④ 其中,①,②,③,④ 4 个位置分别对应于删除、插入、换位和替换操作,可见在t 9 处确实有最多带 4 个误差的匹配。 不允许有换位操作的情况,对应关系如下: t3 t4 t5 t6 t7 t8 t 9 t 10 t1 t2 T: a b c d e f g h i j P: b x c e g f h y ① ② ③ ④ ⑤ 可以看出,在t 9 处不存在最多带 4 个误差的匹配,因为匹配成功所需要的最少编辑操作 个数为 5。 由此可见, 允许有换位操作的近似串匹配算法比以往未考虑换位操作的算法更加实用有 效。尤其是在文本检索和手写体识别等实际应用中,新算法的检索成功率和识别率更高,准10 确性更好,功能更强。2. k-differences 问题的动态规划算法一般的k-differences问题的一个著名算法采用了动态规划(Dynamic Programming)的 技术,可以在O(mn)时间内解决该问题。其基本思想是:根据文本串T[1,n]和模式串P[1, m]计算矩阵D (m+1)x(n+1) ,其中D[i,j](0≤i≤m,0≤j≤n)是模式串P的前缀子串p 1 …p i 与文本 串T的任意以t j 结尾的子串之间的最小误差个数(即最小编辑距离) 。显然,如果D[m,j] ≤ k,则表明在文本串T的t j 处存在最多带k个误差的匹配。D[i,j]由以下公式计算: D[0,j]=0,0≤j≤n D[i,0]=i,1≤i≤m D[i,j]=min(D[i-1,j]+1,D[i,j-1]+1,D[i-1,j-1]+δ(p i ,t j )) 其中,δ(p i ,t j )=0,如果p i =t j ;否则,δ(p i ,t j )=1。 可以看出,D[i,j]的计算是通过递推方式进行的。D[0,j]对应于模式串为空串,而文 本串长度为j的情况,显然,空串不需要任何编辑操作就能在文本串的任何位置处匹配,所 以D[0,j]=0。D[i,0]对应于模式串长度为i,而文本串为空串的情况,此时,至少需要对 模式串进行i次删除操作才能与文本串(空串)匹配,所以D[i,0]=i。在计算D[i,j]时, D[i-1,j],D[i,j-1],D[i-1,j-1]都已计算出,D[i-1,j]+1,D[i,j-1]+1,D[i-1,j-1]+ δ(p i ,t j )分别对应于对模式串进行插入、删除、替换操作的情况,由于D[i,j]是最小编 辑距离,所以取三者中的最小值。 上述动态规划算法只考虑了插入、删除、替换三种编辑操作,但很容易将其推广到允许 有换位操作的情况。 考虑D[i, j]的计算, 若p i-1 p i =t i t i-1 , 则D[i, j]的一个可能取值是D[i-2, j-2]+1,即,将p i-1 p i 变成t i t i-1 只需要进行一次换位操作,从而最小编辑距离只增加 1。由 此可对上述算法进行简单的修改,使之适用于允许有插入、删除、替换和换位四种编辑操作 的情况。 算法 14.10 允许有换位操作的 k-differences 问题的动态规划算法。 输入:文本串 T[1,n],模式串 P[1,m],近似匹配允许的最大误差数 k。 输出:所有匹配位置。 K-Diff_DynamicProgramming(T,n,P,m,k) Begin (1) for j=0 to n do D[0,j]=0 end for (2) for i=1 to m do D[i,0]=i end for (3) for i=1 to m do (3.1) for j=1 to n do () if (P[i]=T[j]) then D[i,j]=D[i-1,j-1] else if (P[i]=T[j-1] and P[i-1]=T[j]) then D[i,j]=min(D[i-2,j-2]+1,D[i-1,j]+1,D[i,j-1]+1) else D[i,j]=min(D[i-1,j]+1,D[i,j-1]+1,D[i-1,j-1]+1) end if end if () if (i=m and D[i,j]&=k) then 找到 P 在 T 中的一个最多带 k 个误差的近似匹配, 输出匹配结束位置 j;11 end if end for end for End 显然,该算法的时间复杂度为 O(mn) 。3. 基于过滤思想的扩展的近似串匹配算法:经典的动态规划算法在实际应用中速度很慢, 往往不能满足实际问题的需要。 为此, S.Wu 和 U.Manber 以及 Gonzalo Navarro 和 Ricardo Baeza-Yates 等人先后提出了多个基于过滤 (Filtration)思想的更加快速的近似串匹配算法。 这些算法一般分为两步: ①按照一定的过 滤原则搜索文本串, 过滤掉那些不可能发生匹配的文本段; ②再进一步验证剩下的匹配候选 位置处是否真的存在成功匹配。由于在第一步中已经滤去了大部分不可能发生匹配的文本 段,因此大大加速了匹配查找过程。在实际应用中,这些过滤算法一般速度都很快。下面我 们针对前面定义的扩展的近似串匹配问题, 讨论了加入换位操作后的 k-differences 问题的 过滤算法。 过滤算法基于这样一种直观的认识:若模式串P在文本串T的t j 处有一个最多带k个误差 的近似匹配,则P中必然有一些子串是不带误差地出现在T中的,也就是说,P中必然有一些 子串与T中的某些子串是精确匹配的。 引理 14.1:在允许有换位操作的最多带 k 个误差的串匹配问题中,如果将模式串 P 划 分成 2k+1 段,那么对于 P 在 T 中的每一次近似出现(最多带 k 个误差) ,这 2k+1 段中至少 有一段是不带误差地出现在 T 中的。 证明:这个引理很容易证明。试想,将 k 个误差,也就是对模式串 P 所做的 k 个编辑操 作,分布于 2k+1 个子模式串中。由于一个插入,删除或替换操作只能改变一个子模式串, 而一个换位操作有可能改变两个子模式串 (例如, 在子模式串 i 的最后一个字符与子模式串 i+1 的第一个字符之间进行一次换位操作) , 所以 k 个编辑操作最多只能改变 2k 个子模式串。 这就是说,在这 2k+1 个子模式串中,至少有一个是未被改变的,它不带误差地出现在文本 串 T 中,与 T 的某个(些)子串精确匹配。■ 根据上面的引理,我们可设计过滤算法,其原理描述如下: 第 1 步: (1) 将模式串P尽可能均匀地划分成 2k+1 个子模式串P 1 ,P 2 ,…,P 2k+1 , 每个子模式串 的长度为 ?? m ? ? m ? 。 具体地说,如果m=q(2k+1)+r,0≤r<2k+1,那 或 ? 2k + 1? ? ? ? 2k + 1 ? ? ? m ? 的子模式串和 (2k+1)-r 个长度为 ? 2k + 1 ? ?么可以将模式串 P 划分成 r 个长度为 ?? m ? 的子模式串。 ? ? 2k + 1 ? ?(2) 采用一种快速的精确串匹配算法, 在文本串T中查找P 1 ,P 2 ,…,P 2k+1 这 2k+1 个子模 式串,找到的与某个子模式串P ? (1≤ ? ≤2k+1)精确匹配的位置也是可能与整个模式 串P发生最多带k个误差的近似匹配的候选位置。 第 2 步: 采用动态规划算法 (算法14.10) 验证在各候选位置附近是否真的存在最多带k个误差12 的近似匹配。假定在第一步中找到了以p j (1≤j≤m)开头的子模式串P ? (1≤ ? ≤2k+1) 在文本串中的一个精确匹配,且该匹配的起始位置在文本串的t i 处,则需要验证从 t i-j+1-k 到t i-j+m+k 之间的文本段T[i-j+1-k, i-j+m+k]。因为在本文所讨论的问题中,允 许的编辑操作包括插入,删除,替换和换位,近似匹配允许的最大误差数为k,所以 如果在候选位置t i 附近存在最多带k个误差的近似匹配,只可能发生在这个长度为 m+2k的文本段范围内,超出这个范围,误差数就大于k了。因此在上述情况下,只需 要验证T[i-j+1-k,i-j+m+k]就足够了。 算法 14.11 允许有换位操作的 k-differences 问题的过滤算法。 输入:文本串 T(长度 n) ,模式串 P(长度 m) ,近似匹配允许最大误差数 k。 输出:所有匹配位置。 K-Diff_Filtration(T,n,P,m,k) Begin (1) r=m mod (2k+1) (2) j=1 (3) for ? =1 to 2k+1 do (3.1) if ( ? <=r= then len= ?? m ? ? 2k + 1 ? ?else len= ?? m ? ? 2k + 1 ? ?end if (3.2) for each exact matching position i in T reported by search(T,n,P[j,j+len-1],len) do check the area T[i-j+1-k, i-j+m+k] end for (3.3) j=j+len end for End 过滤算法的时间性能与模式串的长度 m,近似匹配允许的最大误差数 k,以及字母表中 各字符出现的概率等因素都密切相关。 误差率α定义为近似匹配允许的最大误差数 k 与模式 串长度 m 的比值,即α=k/m。在误差率α很小的情况下,算法 14.11 的平均时间复杂度为 O (kn) 。1.3.2 近似串匹配的并行算法这一节首先介绍扩展近似串匹配过滤算法在 PRAM 模型上的并行化方法,然后给出了分 布式存储模型的并行化过滤算法。1. 扩展近似串匹配问题的过滤算法在 PRAM 模型上的并行化首先假设有p个处理器。 由于, 最多带k个误差的串匹配问题要求在文本串中找出所有成 功匹配的结束位置t j ,因此可以将整个问题划分成p个子问题,每个子问题的文本串长度为 ,用p个处理器并行求解,每个处理器 ? = ?n / p ? (最后一段长度不够可以用特殊字符补齐) 求解一个子问题。子问题i(1≤i≤p)对应于:求结束于t (i-1) ? +1 ,t (i-1) ? +2 ,…,t i ? 的最多带k个 误差的近似匹配。13 考虑第i个子问题。由于在定义的扩展近似串匹配问题中,允许的编辑操作包括插入、 删除、替换和换位,而允许的最大误差数为k,因此结束于t (i-1) ? +1 的最多带k个误差的近似 匹配的最左起始位置应该是t (i-1) ? +1-(k+m-1) 。这说明,在求解子问题i(2≤i≤p)时,必须结合 前一文本段的最后k+m-1 个字符,才能找出所有的匹配位置。 算法 14.12 扩展近似串匹配问题的基于 PRAM 模型的并行化过滤算法 输入:文本串 T[1,n],模式串 P[1,m],近似匹配允许的最大误差数 k 输出:所有匹配位置 Begin (1) ? = ?n / p ? (2) K-Diff_Filtration(T[1, ? ], ? ,P,m,k) (3) for i=2 to p par-do K-Diff_Filtration(T[(i-1) ? +1-(k+m-1),i ? ], ? +k+m-1,P,m,k) end for End 算法 14.12 的平均时间复杂度的分析与上一节中串行过滤算法的分析完全类似。此时, 用 于 查 找 2k+1 个 子 模 式 串 的 时 间 开 销 为 O((2k+1) ? +m)+O((2k+1)( ? +k+m-1)+m) 3 1/2α =O(kn/p+km),用于验证所有候选位置的时间开销约为 2m α/σ 。通过类似的分析讨论可 以得出如下结论:在误差率α很小的情况下,算法 14.12 的平均时间复杂度为O(kn/p+km), 其中,n、m、k和p分别是文本串长度、模式串长度、近似匹配允许的最大误差数和算法所使 用的处理器个数。2. 扩展近似串匹配问题的基于分布式存储模型的并行化过滤算法扩展近似串匹配问题的基于分布式存储模型的并行化过滤算法与前述的基于 PRAM 模型 的并行化过滤算法在设计原理和设计思路上是完全一样的。 只不过由于是在分布式存储环境 下,文本串和模式串分布存储于不同的处理器上,因此算法中涉及到处理器之间的通信。 算法的设计思路是这样的:将长度为 n 的文本串 T 均匀地划分成长度相等且互不重叠的 p 段(最后一段长度不够可以用特殊字符补齐) 。将这 p 个局部文本串按照先后顺序分布于处 理器 0 到(p-1)上,也就是说,第 1 个局部文本串放在处理器 0 上,第 2 个局部文本串放在 处理器 1 上,……,第 p 个局部文本串放在处理器 p-1 上。这样一来,相邻的局部文本串就 被分布在相邻的处理器上,而且每个处理器上局部文本串的长度均为 ?n / p ? 。算法中假定 长度为 m 的模式串 P 初始存储于处理器 0 上, 所以必须将它播送到各个处理器上, 以便所有 处理器并行求解近似串匹配问题。但是根据上小节中的分析,结束位置在各局部文本串(第 1 个局部文本串除外) 的前 m+k-1 个字符中的那些匹配位置必须跨越该局部文本串才能找到, 具体地说, 就是必须结合前一局部文本串的最后 m+k-1 个字符, 才能找到结束于这些位置的 近似匹配。因此,每个处理器(处理器 p-1 除外)应将它所存储的局部文本串的最后 m+k-1 个字符发送给下一处理器, 下一处理器接收到上一处理器发送来的 m+k-1 个字符, 再结合自 身所存储的长度为 ?n / p ? 的局部文本串进行近似匹配查找,就可以找出所有的匹配位置。 在播送模式串 P 到各处理器上, 以及发送局部文本串最后 m+k-1 个字符到下一处理器的通信 操作结束之后, 各处理器调用 K-Diff_Filtration 算法并行地进行匹配查找, 处理器 0 求解 文本串长度为 ?n / p ? ,模式串长度为 m 的子问题,其它各处理器求解文本串长度为14 ?n / p ? +m+k-1,模式串长度为 m 的子问题。算法 14.13 扩展近似串匹配问题的基于分布式存储模型的并行化过滤算法 输入:分布存储于处理器P i 上的文本串T i [1, ?n / p ? ], 存储于处理器P 0 上的模式串P[1,m], 近似匹配允许的最大误差数 k 输出:分布于各个处理器上的匹配结果 Begin (1) P 0 broadcast m (2) P 0 broadcast P[1,m]; (3) for i=0 to p-2 par-do P i send T i [ ?n / p ? -m-k+2, ?n / p ? ] to P i+1 ; end for (4) for i=1 to p-1 par-do P i receive T i-1 [ ?n / p ? -m-k+2, ?n / p ? ] from P i-1 ; end for (5) P 0 call K-Diff_Filtration(T 0 [1, ?n / p ? ], ?n / p ? ,P,m,k); (6) for i=1 to p-1 par-do P i call K-Diff_Filtration (T i-1 [ ?n / p ? -m-k+2, ?n / p ? ]+T i [1, ?n / p ? ], ?n / p ? +m+k-1,P,m,k); end for End 算法 14.13 的时间复杂度包括两部分,一部分是计算时间复杂度,另一部分是通信时间 复杂度。 算法中假定文本串初始已经分布于各个处理器上, 最终的匹配结果也分布于各个处 理 器 上 。 算 法 的 计 算 时 间 由 算 法 第 (5) 步 中 各 处 理 器 同 时 调 用 算 法 14.12 (K-Diff_Filtration 算法)的时间复杂度构成。根据对算法 14.11 的平均时间复杂度的分 析,在误差率α很小的情况下,算法 14.13 的平均计算时间复杂度为 O(kn/p+km) ,当 模 式 串长度 m 远远小于局部文本串长度 n/p 时,平均计算时间复杂度为 O(kn/p) 。算法的通信 时间由算法第(1)步播送模式串长度 m 的时间,第(3)步播送模式串 P 的时间,以及第(4)步 发送各局部文本串末尾 m+k-1 个字符到下一处理器的时间构成。 通信时间复杂度与具体采用 的播送算法有关。 若以每次发送一个字符的时间为单位时间, 则通信时间复杂度为 O (m+k) 。 MPI 源程序请参见所附光盘。1.4 小结本章主要讨论了几个典型的并行串匹配算法,关于 KMP 算法的具体讨论可参考[1] , [2] , [3] ,Karp 和 Rabin 在[4]中首先提出了随机串匹配的算法,关于该算法的正确性 证明可参阅[5] ;文 献 [6]和[7]分别讨论了近似串匹配,允许由换位操作的近似串匹配15 算法见文献[8] 。参考文献[1]. Knuth D E, Morris J H, Pratt V R. Fast Pattern Matching in Strings. SIAM Journal of Computing, ):322-350 [2]. 朱洪等. 算法设计与分析. 上海科学技术出版社, 1989 [3]. 陈国良, 林洁, 顾乃杰. 分布式存储的并行串匹配算法的设计与分析. 软件学报, (6):771-778 [4]. Karp R M, Rabin M O. Efficient randomized pattern-matching algorithms. IBM J. Res. Develop., ):249-260 [5]. 陈国良 编著. 并行算法的设计与分析(修订版). 高等教育出版社, ]. Wu S, Manber U. Fast test searching allowing errors. Communications of the ACM, ):83-91 [7]. Ricardo B Y, Gonzalo N. Faster Approximate String Matching. In Algorithmica, ) [8]. 姚珍 . 带有换位操作的近似串匹配算法及其并行实现 . 中国科学技术大学硕士论 文,2000.5附录 KMP 串匹配并行算法的 MPI 源程序1. 源程序 gen_ped.c#include &stdio.h& #include &stdlib.h& #include &malloc.h& #include &string.h& /*根据输入参数生成模式串*/ int main(int argc,char *argv[]) { int strlen,pedlen,suffixlen,num,i,j; char * FILE * if((suffixlen=strlen%pedlen)!=0) strlen=atoi(argv[1]); pedlen=atoi(argv[2]); srand(atoi(argv[3])); string=(char*)malloc(strlen*sizeof(char)); if(string==NULL) { printf(&malloc error\n&); exit(1); } else { if((fp=fopen(argv[4],&w&))!=NULL) { fprintf(fp,&%s&,string); fclose(fp); strncpy(string+j*pedlen,string,suffixlen); for(j=1;j&(int)(strlen/pedlen);j++) strncpy(string+j*pedlen,string,pedlen); } for(i=0;i&i++) { num=rand()%26; string[i]='a'+ }16 printf(&file open error\n&); exit(1); } }2. 源程序 kmp.c#include &malloc.h& #include &sys/stat.h& #include &sys/types.h& #include &stdio.h& #include &string.h& #include &mpi.h& #define MAX(m,n) (m&n?m:n) } pped-&pedlen=patlen-next[patlen]; pped-&pednum=(int)(patlen/pped-&pedlen); pped-&psuffixlen=patlen%pped-& free(next); } /*改进的 KMP 算法*/ void kmp(char *T,char*W,int textlen,int patlen, int i,j, int * { if((next=(int *)malloc((patlen+1)*sizeof(int))) ==NULL) { printf(&no enough memory\n&); exit(1); } /*计算 next 和 nextval*/ next[0]=nextval[0]=-1; j=1; while(j&=patlen) { i=next[j-1]; while(i!=(-1) && W[i]!=W[j-1]) i=next[i]; next[j]=i+1; { match[i-(patlen-1)]=1; if(pped-&pednum+pped-&psuffixlen ==1) 17 while(i&textlen) { if((prefix_flag==1)&&((patlen-j)& (textlen-i))) while(j!=(-1) && W[j]!=T[i]) j=nextval[j]; if(j==(patlen-1)) i=matched_ j=matched_ int i,j; int *nextval,pntype *pped,int prefix_flag, int matched_num,int *match,int *prefixlen) } j++; typedef struct{ } /*对模式串进行周期分析,并计算相应的 new 和 newval 值*/ void Next(char *W,int patlen,int *nextval, pntype *pped) { else nextval[j]=nextval[i+1]; if(j!=patlen) { if( W[j]!=W[i+1]) nextval[j]=i+1; j = -1 ; else j=patlen-1-pped-& } j++; i++; } (*prefixlen)=j; } /*重构模式串以及 next 函数*/ void Rebuild_info(int patlen,pntype *pped, int *nextval,char *W) { if (pped-&pednum == 1) memcpy(W+pped-&pedlen,W, pped-&psuffixlen); else { memcpy(W+pped-&pedlen,W, pped-&pedlen); for (i=3; i&=pped-& i++) { memcpy(W+(i-1)*pped-&pedlen,W, pped-&pedlen); memcpy(nextval+(i-1)* pped-&pedlen, nextval+pped-&pedlen, pped-&pedlen*sizeof(int)); } if(pped-&psuffixlen!=0) { memcpy(W+(i-1)*pped-&pedlen,W, pped-&psuffixlen); memcpy(nextval+(i-1)* pped-&pedlen,nextval+ pped-&pedlen, pped-&psuffixlen*sizeof(int)); } } }/*生成文本串*/ void gen_string(int strlen,int pedlen,char *string, int seed) { int suffixlen,num,i,j; srand(seed); for(i=0;i&i++) { num=rand()%26; string[i]='a'+ } for(j=1;j&(int)(strlen/pedlen);j++) strncpy(string+j*pedlen,string,pedlen); if((suffixlen=strlen%pedlen)!=0) strncpy(string+j*pedlen,string,suffixlen); } /*从文件读入模式串信息*/ void GetFile(char *filename,char *place, int *number) { FILE * if ((fp=fopen(filename,&rb&))==NULL) { printf (&Error open file %s\n&,filename); exit(0); } fstat(fileno(fp),&statbuf); if(((*place)=(char *)malloc(sizeof(char)* statbuf.st_size)) == NULL){ printf (&Error alloc memory\n&); exit(1); } if (fread((*place),1,statbuf.st_size,fp) !=statbuf.st_size){ printf (&Error in reading num\n&); exit(0); } fclose (fp); (*number)=statbuf.st_18 } /*打印运行参数信息*/ void PrintFile_info(char *filename,char *T,int id) { FILE * if ((fp=fopen(filename,&a&))==NULL) { printf (&Error open file %s\n&,filename); exit(0); } fprintf (fp,&The Text on node %d is %s .\n&, id,T); fclose (fp); }{ char *T,*W; int int int *nextval,* textlen,patlen,pedlen,nextlen_ i,myid,numprocs,prefixlen, MPI_S MPI_Init(&argc,&argv); MPI_Comm_size(MPI_COMM_WORLD, &numprocs); MPI_Comm_rank(MPI_COMM_WORLD, &myid); nextlen_send=0; ready=1; /*读如文本串长度*/ textlen=atoi(argv[1]); textlen=textlen/ pedlen=atoi(argv[2]);/*打印匹配结果*/ void PrintFile_res(char *filename,int *t,int len, int init,int id) { FILE * if ((fp=fopen(filename,&a&))==NULL) { printf (&Error open file %s\n&,filename); exit(0); } fprintf (fp,&This is the match result on node %d \n&,id); for (i=0; i&=len-1; i++) if(t[i]==1) fprintf (fp,&(%d) else fprintf (fp,&(%d) fclose (fp); } void main(int argc,char *argv[]) -\n&,i+init); +\n&,i+init);if((T=(char *)malloc(textlen*sizeof(char))) ==NULL) { printf(&no enough memory\n&); exit(1); } gen_string(textlen,pedlen,T,myid); if(myid==0) { PrintFile_info(&match_result&,T,myid); if(numprocs&1) MPI_Send(&ready,1,MPI_INT,1,0, MPI_COMM_WORLD); } else { MPI_Recv(&ready,1,MPI_INT,myid-1, myid-1,MPI_COMM_WORLD, &status); PrintFile_info(&match_result&,T,myid); if(myid!=numprocs-1) MPI_Send(&ready,1,MPI_INT, myid+1,myid,19 MPI_COMM_WORLD); } printf(&\n&); if((match=(int *)malloc(textlen*sizeof(int))) ==NULL) { printf(&no enough memory\n&); exit(1); } /*处理器 0 读入模式串,并记录运行参数*/ if(myid==0) { printf(&processor num = %d \n&, numprocs); printf(&textlen = %d\

我要回帖

更多关于 跨度计算公式 的文章

 

随机推荐