[Dart-dev] [6164] DART/branches/development: Added the ability to choose a round_robin MPI task layout to the ensemble manager (the default is to use the standard layout my_pe = my_task_id).

nancy at ucar.edu nancy at ucar.edu
Tue May 21 17:15:47 MDT 2013


Revision: 6164
Author:   hkershaw
Date:     2013-05-21 17:15:46 -0600 (Tue, 21 May 2013)
Log Message:
-----------
Added the ability to choose a round_robin MPI task layout to the ensemble manager (the default is to use the standard layout my_pe = my_task_id). Several other routines have been changed  to use my_pe instead of my_task_id.  Pe 0 is forced to be task 0 in the round-robin layout. This is a conservative measure for the release to avoid any unforseen problems arising from any code making the assumption that task 0 has a copy. For the future we may want to violate this assumption.  filter_state_space_diagnostics was changed to accept the current ensemble time as an arguement.  smoother_ss_diagnosics still assumes that task 0 has an ensemble copy. Note that not all files listed in this commit contain code changes, some only contain svn mergeinfo changes. Added documentation to ensemble_manager_mod.html for the layout and for communication loops.  Note: we need better names for the communication loop flags.

Modified Paths:
--------------
    DART/branches/development/adaptive_inflate/adaptive_inflate_mod.f90
    DART/branches/development/assim_tools/assim_tools_mod.f90
    DART/branches/development/ensemble_manager/ensemble_manager_mod.f90
    DART/branches/development/ensemble_manager/ensemble_manager_mod.html
    DART/branches/development/filter/filter.dopplerfold.f90
    DART/branches/development/filter/filter.f90
    DART/branches/development/obs_model/obs_model_mod.f90
    DART/branches/development/smoother/smoother_mod.f90

Added Paths:
-----------
    DART/branches/development/doc/html/comm_pattern512.png
    DART/branches/development/doc/html/comm_pattern96.png

Property Changed:
----------------
    DART/branches/development/adaptive_inflate/
    DART/branches/development/assim_tools/assim_tools_mod.f90
    DART/branches/development/matlab/GetClmInfo.m
    DART/branches/development/models/bgrid_solo/model_mod.f90
    DART/branches/development/models/cam/
    DART/branches/development/models/clm/
    DART/branches/development/obs_kind/DEFAULT_obs_kind_mod.F90
    DART/branches/development/observations/NCEP/ascii_to_obs/real_obs_mod.f90
    DART/branches/development/observations/snow/
    DART/branches/development/utilities/
    DART/branches/development/utilities/closest_member_tool.f90

-------------- next part --------------

Property changes on: DART/branches/development/adaptive_inflate
___________________________________________________________________
Modified: svn:mergeinfo
   - /DART/branches/inf_restart:4784-4812
/DART/trunk/adaptive_inflate:4680-5706
   + /DART/branches/helen/adaptive_inflate:5995-6161
/DART/branches/inf_restart:4784-4812
/DART/trunk/adaptive_inflate:4680-5706

Modified: DART/branches/development/adaptive_inflate/adaptive_inflate_mod.f90
===================================================================
--- DART/branches/development/adaptive_inflate/adaptive_inflate_mod.f90	2013-05-21 21:52:18 UTC (rev 6163)
+++ DART/branches/development/adaptive_inflate/adaptive_inflate_mod.f90	2013-05-21 23:15:46 UTC (rev 6164)
@@ -18,7 +18,8 @@
                                  error_handler, E_ERR, E_MSG
 use random_seq_mod,       only : random_seq_type, random_gaussian, init_random_seq
 use ensemble_manager_mod, only : ensemble_type, all_vars_to_all_copies, all_copies_to_all_vars, &
-                                 read_ensemble_restart, write_ensemble_restart, get_copy_owner_index
+                                 read_ensemble_restart, write_ensemble_restart, get_copy_owner_index, &
+                                 map_pe_to_task
 use mpi_utilities_mod,    only : my_task_id, send_to, receive_from
 
 implicit none
@@ -205,11 +206,11 @@
       ! a transpose.
       if (.not. mean_from_restart) then
          call get_copy_owner_index(ss_inflate_index, owner, owners_index)
-         if (owner == my_task_id()) ens_handle%vars(:, owners_index) = inf_initial
+         if (owner == ens_handle%my_pe) ens_handle%vars(:, owners_index) = inf_initial
       endif
       if (.not. sd_from_restart) then
          call get_copy_owner_index(ss_inflate_sd_index, owner, owners_index)
-         if (owner == my_task_id()) ens_handle%vars(:, owners_index) = sd_initial
+         if (owner == ens_handle%my_pe) ens_handle%vars(:, owners_index) = sd_initial
       endif
    endif
 
@@ -230,12 +231,12 @@
          ! someone else has the inf array.  have the owner send the min/max
          ! values to PE0.  after this point only PE0 has the right value
          ! in minmax_mean, but it is the only one who is going to print below.
-         if (my_task_id() == 0) then
-            call receive_from(owner, minmax_mean)
-         else if (my_task_id() == owner) then
+         if (ens_handle%my_pe == 0) then
+            call receive_from(map_pe_to_task(ens_handle, owner), minmax_mean)
+         else if (ens_handle%my_pe == owner) then
             minmax_mean(1) = minval(ens_handle%vars(:, owners_index))
             minmax_mean(2) = maxval(ens_handle%vars(:, owners_index))
-            call send_to(0, minmax_mean)
+            call send_to(map_pe_to_task(ens_handle, 0), minmax_mean)
          endif
       endif
    endif
@@ -250,12 +251,12 @@
          ! someone else has the sd array.  have the owner send the min/max
          ! values to PE0.  after this point only PE0 has the right value
          ! in minmax_sd, but it is the only one who is going to print below.
-         if (my_task_id() == 0) then
-            call receive_from(owner, minmax_sd)
-         else if (my_task_id() == owner) then
+         if (ens_handle%my_pe == 0) then
+            call receive_from(map_pe_to_task(ens_handle, owner), minmax_sd)
+         else if (ens_handle%my_pe == owner) then 
             minmax_sd(1) = minval(ens_handle%vars(:, owners_index))
             minmax_sd(2) = maxval(ens_handle%vars(:, owners_index))
-            call send_to(0, minmax_sd)
+            call send_to(map_pe_to_task(ens_handle, 0), minmax_sd)
          endif
       endif
    endif
@@ -268,9 +269,9 @@
    ! ensure the entire array contains a single constant value to match what the code uses.
    if(inf_flavor == 3) then
       call get_copy_owner_index(ss_inflate_index, owner, owners_index)
-      if (owner == my_task_id()) ens_handle%vars(:, owners_index) = ens_handle%vars(1, owners_index)
+      if (owner == ens_handle%my_pe) ens_handle%vars(:, owners_index) = ens_handle%vars(1, owners_index)
       call get_copy_owner_index(ss_inflate_sd_index, owner, owners_index)
-      if (owner == my_task_id()) ens_handle%vars(:, owners_index) = ens_handle%vars(1, owners_index)
+      if (owner == ens_handle%my_pe) ens_handle%vars(:, owners_index) = ens_handle%vars(1, owners_index)
    endif
 
 !------ Block for obs. space inflation initialization ------

Modified: DART/branches/development/assim_tools/assim_tools_mod.f90
===================================================================
--- DART/branches/development/assim_tools/assim_tools_mod.f90	2013-05-21 21:52:18 UTC (rev 6163)
+++ DART/branches/development/assim_tools/assim_tools_mod.f90	2013-05-21 23:15:46 UTC (rev 6164)
@@ -40,7 +40,8 @@
                                  LocationDims, vert_is_surface, has_vertical_localization
 
 use ensemble_manager_mod, only : ensemble_type, get_my_num_vars, get_my_vars,             & 
-                                 compute_copy_mean_var, get_var_owner_index
+                                 compute_copy_mean_var, get_var_owner_index,              &
+                                 map_pe_to_task 
 
 use mpi_utilities_mod,    only : my_task_id, broadcast_send, broadcast_recv,              & 
                                  sum_across_tasks
@@ -489,7 +490,8 @@
 
    ! Following block is done only by the owner of this observation
    !-----------------------------------------------------------------------
-   if(my_task_id() == owner) then
+   if(ens_handle%my_pe == owner) then
+
       obs_qc = obs_ens_handle%copies(OBS_GLOBAL_QC_COPY, owners_index)
       ! Only value of 0 for DART QC field should be assimilated
       IF_QC_IS_OKAY: if(nint(obs_qc) ==0) then
@@ -551,14 +553,14 @@
       !Broadcast the info from this obs to all other processes
       ! What gets broadcast depends on what kind of inflation is being done
       if(local_varying_ss_inflate) then
-         call broadcast_send(owner, obs_prior, obs_inc, orig_obs_prior_mean, &
+         call broadcast_send(map_pe_to_task(ens_handle, owner), obs_prior, obs_inc, orig_obs_prior_mean, &
             orig_obs_prior_var, net_a, scalar1=obs_qc)
 
       else if(local_single_ss_inflate .or. local_obs_inflate) then
-         call broadcast_send(owner, obs_prior, obs_inc, net_a, &
+         call broadcast_send(map_pe_to_task(ens_handle, owner), obs_prior, obs_inc, net_a, &
            scalar1=my_inflate, scalar2=my_inflate_sd, scalar3=obs_qc)
       else
-         call broadcast_send(owner, obs_prior, obs_inc, net_a, scalar1=obs_qc)
+         call broadcast_send(map_pe_to_task(ens_handle, owner), obs_prior, obs_inc, net_a, scalar1=obs_qc)
       endif
 
    ! Next block is done by processes that do NOT own this observation
@@ -567,13 +569,13 @@
       ! I don't store this obs; receive the obs prior and increment from broadcast
       ! Also get qc and inflation information if needed
       if(local_varying_ss_inflate) then
-         call broadcast_recv(owner, obs_prior, obs_inc, orig_obs_prior_mean, &
+         call broadcast_recv(map_pe_to_task(ens_handle, owner), obs_prior, obs_inc, orig_obs_prior_mean, &
             orig_obs_prior_var, net_a, scalar1=obs_qc)
       else if(local_single_ss_inflate .or. local_obs_inflate) then
-         call broadcast_recv(owner, obs_prior, obs_inc, net_a, &
+         call broadcast_recv(map_pe_to_task(ens_handle, owner), obs_prior, obs_inc, net_a, &
             scalar1=my_inflate, scalar2=my_inflate_sd, scalar3=obs_qc)
       else
-         call broadcast_recv(owner, obs_prior, obs_inc, net_a, scalar1=obs_qc)
+         call broadcast_recv(map_pe_to_task(ens_handle, owner), obs_prior, obs_inc, net_a, scalar1=obs_qc)
       endif
    endif
    !-----------------------------------------------------------------------
@@ -651,7 +653,7 @@
             rev_num_close_obs = count_close(num_close_obs, close_obs_ind, my_obs_kind, &
                                               close_obs_dist, cutoff_rev*2.0_r8)
 
-      
+
             ! GSR output the new cutoff 
             ! Here is what we might want: 
             ! time, ob index #, ob location, new cutoff, the assimilate obs count, owner (which process has this ob)


Property changes on: DART/branches/development/assim_tools/assim_tools_mod.f90
___________________________________________________________________
Modified: svn:mergeinfo
   - /DART/releases/Kodiak/assim_tools/assim_tools_mod.f90:5020-5583
/DART/trunk/assim_tools/assim_tools_mod.f90:4680-5706
   + /DART/branches/helen/assim_tools/assim_tools_mod.f90:5995-6161
/DART/releases/Kodiak/assim_tools/assim_tools_mod.f90:5020-5583
/DART/trunk/assim_tools/assim_tools_mod.f90:4680-5706

Added: DART/branches/development/doc/html/comm_pattern512.png
===================================================================
(Binary files differ)


Property changes on: DART/branches/development/doc/html/comm_pattern512.png
___________________________________________________________________
Added: svn:mime-type
   + application/octet-stream

Added: DART/branches/development/doc/html/comm_pattern96.png
===================================================================
(Binary files differ)


Property changes on: DART/branches/development/doc/html/comm_pattern96.png
___________________________________________________________________
Added: svn:mime-type
   + application/octet-stream

Modified: DART/branches/development/ensemble_manager/ensemble_manager_mod.f90
===================================================================
--- DART/branches/development/ensemble_manager/ensemble_manager_mod.f90	2013-05-21 21:52:18 UTC (rev 6163)
+++ DART/branches/development/ensemble_manager/ensemble_manager_mod.f90	2013-05-21 23:15:46 UTC (rev 6164)
@@ -29,6 +29,7 @@
 use time_manager_mod,  only : time_type, set_time
 use random_seq_mod,    only : random_seq_type, init_random_seq, random_gaussian
 use mpi_utilities_mod, only : task_count, my_task_id, send_to, receive_from, task_sync
+use sort_mod,          only : index_sort
 
 implicit none
 private
@@ -49,7 +50,8 @@
           get_my_vars,                compute_copy_mean,        compute_copy_mean_sd,   &
           get_copy,                   put_copy,                 all_vars_to_all_copies, &
           all_copies_to_all_vars,     read_ensemble_restart,    write_ensemble_restart, &
-          compute_copy_mean_var,      get_copy_owner_index,     set_ensemble_time
+          compute_copy_mean_var,      get_copy_owner_index,     set_ensemble_time,      &
+          map_task_to_pe,             map_pe_to_task
 
 type ensemble_type
    !DIRECT ACCESS INTO STORAGE IS USED TO REDUCE COPYING: BE CAREFUL
@@ -63,6 +65,11 @@
    ! Time is only related to var complete
    type(time_type), pointer :: time(:)
    integer                  :: distribution_type
+   integer, allocatable     :: task_to_pe_list(:), pe_to_task_list(:) ! List of tasks
+   ! Flexible my_pe, layout_type which allows different task layouts for different ensemble handles
+   integer                  :: my_pe    
+   integer                  :: layout_type           
+
 end type ensemble_type
 
 ! Logical flag for initialization of module
@@ -72,7 +79,7 @@
 character(len = 129) :: errstring
 
 ! Module storage for pe information for this process avoids recomputation
-integer              :: my_pe, num_pes
+integer              :: num_pes
 
 !-----------------------------------------------------------------
 !
@@ -86,15 +93,22 @@
 logical  :: single_restart_file_out = .true.
 ! Size of perturbations for creating ensembles when model won't do it
 real(r8) :: perturbation_amplitude  = 0.2_r8
-! Options to change order of loops in the transposes
+! Options to change order of communication loops in the transpose routines
 logical  :: use_copy2var_send_loop = .true.
 logical  :: use_var2copy_rec_loop = .true.
+! task layout options:
+integer  :: layout = 1 ! default to my_pe = my_task_id(). Layout2 assumes that the user knows the correct tasks_per_node
+integer  :: tasks_per_node = 1 ! default to 1 if the user does not specify a number of tasks per node.
+logical  :: debug = .false.
 
 namelist / ensemble_manager_nml / single_restart_file_in,  &
                                   single_restart_file_out, &
                                   perturbation_amplitude,  &
                                   use_copy2var_send_loop,  &
-                                  use_var2copy_rec_loop
+                                  use_var2copy_rec_loop,   &
+                                  layout, tasks_per_node,  &
+                                  debug
+                                  
 !-----------------------------------------------------------------
 
 contains
@@ -102,11 +116,12 @@
 !-----------------------------------------------------------------
 
 subroutine init_ensemble_manager(ens_handle, num_copies, &
-   num_vars, distribution_type_in)
+   num_vars, distribution_type_in, layout_type)
 
 type(ensemble_type), intent(out)            :: ens_handle
 integer,             intent(in)             :: num_copies, num_vars
 integer,             intent(in), optional   :: distribution_type_in
+integer,             intent(in), optional   :: layout_type
 
 integer :: iunit, io
 
@@ -136,13 +151,37 @@
 
    ! Get mpi information for this process; it's stored in module storage
    num_pes = task_count()
-   my_pe = my_task_id()
 endif
 
+! Optional layout_type argument to assign how my_pe is related to my_task_id
+! layout_type can be set individually for each ensemble handle. It is not advisable to do this
+! because get_obs_ens assumes that the layout is the same for each ensemble handle.
+if(.not. present(layout_type) ) then
+   ens_handle%layout_type = layout ! namelist option
+else
+   ens_handle%layout_type = layout_type
+endif
+
+! Check for error: only layout_types 1 and 2 are implemented
+if (ens_handle%layout_type /= 1 .and. ens_handle%layout_type /=2 ) then
+   call error_handler(E_ERR, 'init_ensemble_manager', 'only layout values 1 (standard), 2(round-robin) allowed ', source, revision, revdate)
+endif
+
+allocate(ens_handle%task_to_pe_list(num_pes))
+allocate(ens_handle%pe_to_task_list(num_pes))
+
+call assign_tasks_to_pes(ens_handle, num_copies, ens_handle%layout_type)
+ens_handle%my_pe = map_task_to_pe(ens_handle, my_task_id())
+
 ! Set the global storage bounds for the number of copies and variables
 ens_handle%num_copies = num_copies
 ens_handle%num_vars = num_vars
 
+if(debug .and. my_task_id()==0) then
+   print*, 'pe_to_task_list', ens_handle%pe_to_task_list
+   print*, 'task_to_pe_list', ens_handle%task_to_pe_list
+endif
+
 ! Figure out how the ensemble copies are partitioned
 call set_up_ens_distribution(ens_handle)
 
@@ -199,15 +238,15 @@
 !-------- Block for single restart file or single member  being perturbed -----
 if(single_restart_file_in .or. .not. start_from_restart .or. &
    single_file_override) then 
-   ! Single restart file is read only by master_pe and then distributed
-   if(my_pe == 0) iunit = open_restart_read(file_name)
+   ! Single restart file is read only by task 0 and then distributed
+   if(my_task_id() == 0) iunit = open_restart_read(file_name)
    allocate(ens(ens_handle%num_vars))   ! used to be on stack.
 
    ! Loop through the total number of copies
    do i = start_copy, end_copy
-      ! Only master_pe does reading. Everybody can do their own perturbing
-      if(my_pe == 0) then
-         ! Read restarts in sequence; only read once for not start_from_restart
+     ! Only task 0 does reading. Everybody can do their own perturbing
+       if(my_task_id() == 0) then
+       ! Read restarts in sequence; only read once for not start_from_restart
          if(start_from_restart .or. i == start_copy) &
             call aread_state_restart(ens_time, ens, iunit)
          ! Override this time if requested by namelist
@@ -215,15 +254,16 @@
       endif
 
       ! Store this copy in the appropriate place on the appropriate process
-      call put_copy(0, ens_handle, i, ens, ens_time)
-
+      ! map from my_pe to physical task number all done in send and recieves only
+     call put_copy(map_task_to_pe(ens_handle,0), ens_handle, i, ens, ens_time)
    end do
    
    deallocate(ens)
-   ! Master pe must close the file it's been reading
-   if(my_pe == 0) call close_restart(iunit)
+   ! Task 0 must close the file it's been reading
+   if(my_task_id() == 0) call close_restart(iunit)
 
 else
+
 !----------- Block that follows is for multiple restart files -----------
    ! Loop to read in all my ensemble members
    READ_MULTIPLE_RESTARTS: do i = 1, ens_handle%my_num_copies
@@ -299,17 +339,18 @@
 ! Need to force single restart file for inflation files
 if(single_restart_file_out .or. single_file_forced) then
 
-   ! Single restart file is written only by the master_pe
-   if(my_pe == 0) then
-      iunit = open_restart_write(file_name)
+   ! Single restart file is written only by task 0
+   if(my_task_id() == 0) then
 
+     iunit = open_restart_write(file_name)
+
       ! Loop to write each ensemble member
       allocate(ens(ens_handle%num_vars))   ! used to be on stack.
       do i = start_copy, end_copy
          ! Figure out where this ensemble member is being stored
          call get_copy_owner_index(i, owner, owners_index)
-         ! If it's on the master pe, just write it
-         if(owner == 0) then
+         ! If it's on task 0, just write it
+         if(map_pe_to_task(ens_handle, owner) == 0) then
             call awrite_state_restart(ens_handle%time(owners_index), &
                ens_handle%vars(:, owners_index), iunit)
          else
@@ -317,15 +358,13 @@
             ! This communication assumes index numbers are monotonically increasing
             ! and that communications is blocking so that there are not multiple 
             ! outstanding messages from the same processors (also see send_to below).
-            call receive_from(owner, ens, ens_time)
+            call receive_from(map_pe_to_task(ens_handle, owner), ens, ens_time)
             call awrite_state_restart(ens_time, ens, iunit)
          endif
       end do
       deallocate(ens)
       call close_restart(iunit)
-   else
-      ! If I'm not the master pe with a single restart, I must send my copies
-      ! to master pe for writing to file
+   else ! I must send my copies to task 0 for writing to file
       do i = 1, ens_handle%my_num_copies
          ! Figure out which global index this is
          global_index = ens_handle%my_copies(i)
@@ -393,9 +432,9 @@
 call get_copy_owner_index(copy, owner, owners_index)
 
 !----------- Block of code that must be done by receiving pe -----------------------------
-if(my_pe == receiving_pe) then
+if(ens_handle%my_pe == receiving_pe) then
    ! If PE that stores is the same, just copy and return
-   if(my_pe == owner) then
+   if(ens_handle%my_pe == owner) then
       vars = ens_handle%vars(:, owners_index)
       if(present(mtime)) mtime = ens_handle%time(owners_index)
       ! If I'm the receiving PE and also the owner, I'm all finished; return
@@ -403,16 +442,17 @@
    endif
  
    ! Otherwise, must wait to receive vars and time from storing pe
-      call receive_from(owner, vars, mtime)
+      call receive_from(map_pe_to_task(ens_handle, owner), vars, mtime)
 endif
 
 !----- Block of code that must be done by PE that stores the copy IF it is NOT receiver -----
-if(my_pe == owner) then
+if(ens_handle%my_pe == owner) then
    ! Send copy to receiving pe
+
    if(present(mtime)) then
-      call send_to(receiving_pe, ens_handle%vars(:, owners_index), ens_handle%time(owners_index))
+      call send_to(map_pe_to_task(ens_handle, receiving_pe), ens_handle%vars(:, owners_index), ens_handle%time(owners_index))
    else
-      call send_to(receiving_pe, ens_handle%vars(:, owners_index))
+      call send_to(map_pe_to_task(ens_handle, receiving_pe), ens_handle%vars(:, owners_index))
    endif
 endif
 !------ End of block ---------------------------------------------------------------------
@@ -454,9 +494,9 @@
 call get_copy_owner_index(copy, owner, owners_index)
 
 ! Block of code that must be done by PE that is to send the copy
-if(my_pe == sending_pe) then
+if(ens_handle%my_pe == sending_pe) then
    ! If PE that stores is the same, just copy and return
-   if(my_pe == owner) then
+   if(ens_handle%my_pe == owner) then
       ens_handle%vars(:, owners_index) = vars
       if(present(mtime)) ens_handle%time(owners_index) = mtime
       ! If I'm the sending PE and also the owner, I'm all finished; return
@@ -464,16 +504,17 @@
    endif
  
    ! Otherwise, must send vars and possibly time to storing pe
-      call send_to(owner, vars, mtime)
+   call send_to(map_pe_to_task(ens_handle, owner), vars, mtime)
+
 endif
 
 ! Block of code that must be done by PE that stores the copy IF it is NOT sender
-if(my_pe == owner) then
+if(ens_handle%my_pe == owner) then
    ! Need to receive copy from sending_pe
    if(present(mtime)) then
-      call receive_from(sending_pe, ens_handle%vars(:, owners_index), ens_handle%time(owners_index))
+      call receive_from(map_pe_to_task(ens_handle, sending_pe), ens_handle%vars(:, owners_index), ens_handle%time(owners_index))
    else
-      call receive_from(sending_pe, ens_handle%vars(:, owners_index))
+      call receive_from(map_pe_to_task(ens_handle, sending_pe), ens_handle%vars(:, owners_index))
    endif
 endif
 
@@ -527,7 +568,7 @@
 
 ! Free up the allocated storage
 deallocate(ens_handle%my_copies, ens_handle%time, ens_handle%my_vars, &
-           ens_handle%vars,    ens_handle%copies)
+           ens_handle%vars,    ens_handle%copies, ens_handle%task_to_pe_list, ens_handle%pe_to_task_list)
 
 end subroutine end_ensemble_manager
 
@@ -658,7 +699,7 @@
 ! Compute the total number of copies I'll get for var complete
 num_per_pe_below = ens_handle%num_copies / num_pes
 num_left_over = ens_handle%num_copies - num_per_pe_below * num_pes
-if(num_left_over >= (my_pe + 1)) then
+if(num_left_over >= (ens_handle%my_pe + 1)) then
    ens_handle%my_num_copies = num_per_pe_below + 1
 else
    ens_handle%my_num_copies = num_per_pe_below
@@ -667,7 +708,7 @@
 ! Do the same thing for copy complete: figure out which vars I get
 num_per_pe_below = ens_handle%num_vars / num_pes
 num_left_over = ens_handle%num_vars - num_per_pe_below * num_pes
-if(num_left_over >= (my_pe + 1)) then
+if(num_left_over >= (ens_handle%my_pe + 1)) then
    ens_handle%my_num_vars = num_per_pe_below + 1
 else
    ens_handle%my_num_vars = num_per_pe_below
@@ -685,22 +726,22 @@
 ens_handle%copies = MISSING_R8
 
 ! Fill out the number of my members
-call get_copy_list(ens_handle%num_copies, my_pe, ens_handle%my_copies, i)
+call get_copy_list(ens_handle%num_copies, ens_handle%my_pe, ens_handle%my_copies, i)
 
 ! Initialize times to missing
+! This is only initializing times for pes that have ensemble copies
 do i = 1, ens_handle%my_num_copies
    ens_handle%time(i) = set_time(0, 0)
 end do
 
 ! Fill out the number of my vars
-call get_var_list(ens_handle%num_vars, my_pe, ens_handle%my_vars, i)
+call get_var_list(ens_handle%num_vars, ens_handle%my_pe, ens_handle%my_vars, i)
 
 end subroutine set_up_ens_distribution
 
 !-----------------------------------------------------------------
 
 subroutine get_copy_owner_index(copy_number, owner, owners_index)
-!!!subroutine get_copy_owner_index(copy_number, owner, owners_index, distribution_type)
 
 ! Given the copy number, returns which PE stores it when copy complete
 ! and its index in that pes local storage. Depends on distribution_type
@@ -708,10 +749,10 @@
 
 integer, intent(in)  :: copy_number
 integer, intent(out) :: owner, owners_index
-!!!integer, intent(in) :: distribution_type
 
 integer :: div
 
+! Asummes distribution type 1
 div = (copy_number - 1) / num_pes
 owner = copy_number - div * num_pes - 1
 owners_index = div + 1
@@ -721,7 +762,6 @@
 !-----------------------------------------------------------------
 
 subroutine get_var_owner_index(var_number, owner, owners_index)
-!!!subroutine get_var_owner_index(var_number, owner, owners_index, distribution_type)
 
 ! Given the var number, returns which PE stores it when var complete
 ! and its index in that pes local storage. Depends on distribution_type
@@ -729,10 +769,10 @@
 
 integer, intent(in)  :: var_number
 integer, intent(out) :: owner, owners_index
-!!!integer, intent(in) :: distribution_type
 
 integer :: div
 
+! Asummes distribution type 1
 div = (var_number - 1) / num_pes
 owner = var_number - div * num_pes - 1
 owners_index = div + 1
@@ -855,7 +895,7 @@
 
 integer,  allocatable :: var_list(:), copy_list(:)
 real(r8), allocatable :: transfer_temp(:)
-integer               :: num_copies, num_vars, my_num_vars, my_num_copies
+integer               :: num_copies, num_vars, my_num_vars, my_num_copies, my_pe
 integer               :: max_num_vars, max_num_copies, num_copies_to_receive
 integer               :: sending_pe, recv_pe, k, sv, num_vars_to_send, copy
 integer               :: global_ens_index
@@ -876,6 +916,7 @@
 num_vars      = ens_handle%num_vars
 my_num_vars   = ens_handle%my_num_vars
 my_num_copies = ens_handle%my_num_copies
+my_pe         = ens_handle%my_pe
 
 ! What is maximum number of vars stored on a copy complete pe?
 max_num_vars = get_max_num_vars(num_vars)
@@ -910,7 +951,7 @@
               else
                  if (num_copies_to_receive > 0) then
                     ! Otherwise, receive this part from the sending pe
-                    call receive_from(sending_pe, transfer_temp(1:my_num_vars))
+                    call receive_from(map_pe_to_task(ens_handle, sending_pe), transfer_temp(1:my_num_vars))
    
                     ! Copy the transfer array to my local storage
                     ens_handle%copies(global_ens_index, :) = transfer_temp(1:my_num_vars)
@@ -927,7 +968,7 @@
               ! Have to use temp because %var section is not contiguous storage
               transfer_temp(sv) = ens_handle%vars(var_list(sv), k)
            enddo
-           call send_to(recv_pe, transfer_temp(1:num_vars_to_send))
+           call send_to(map_pe_to_task(ens_handle, recv_pe), transfer_temp(1:num_vars_to_send))
         end do
       
      endif
@@ -958,7 +999,7 @@
                  ens_handle%copies(global_ens_index, :) = transfer_temp(1:num_vars_to_send)
                else
                  ! Otherwise, ship this off
-                 call send_to(recv_pe, transfer_temp(1:num_vars_to_send))
+                 call send_to(map_pe_to_task(ens_handle, recv_pe), transfer_temp(1:num_vars_to_send))
                endif
              end do ALL_MY_COPIES_SEND_LOOP
            endif
@@ -971,7 +1012,7 @@
         do copy = 1, num_copies_to_receive
           if (my_num_vars > 0) then
             ! Have to  use temp because %copies section is not contiguous storage
-            call receive_from(sending_pe, transfer_temp(1:my_num_vars))
+            call receive_from(map_pe_to_task(ens_handle, sending_pe), transfer_temp(1:my_num_vars))
             ! Figure out which global ensemble member this is
             global_ens_index = copy_list(copy)
             ! Store this chunk in my local storage
@@ -1008,7 +1049,7 @@
 
 integer,  allocatable :: var_list(:), copy_list(:)
 real(r8), allocatable :: transfer_temp(:)
-integer               :: num_copies, num_vars, my_num_vars, my_num_copies
+integer               :: num_copies, num_vars, my_num_vars, my_num_copies, my_pe
 integer               :: max_num_vars, max_num_copies, num_vars_to_receive
 integer               :: sending_pe, recv_pe, k, sv, copy, num_copies_to_send
 integer               :: global_ens_index
@@ -1029,6 +1070,7 @@
 num_vars      = ens_handle%num_vars
 my_num_vars   = ens_handle%my_num_vars
 my_num_copies = ens_handle%my_num_copies
+my_pe         = ens_handle%my_pe
 
 ! What is maximum number of vars stored on a copy complete pe?
 max_num_vars = get_max_num_vars(num_vars)
@@ -1041,53 +1083,51 @@
 
 
 if (use_copy2var_send_loop .eqv. .true. ) then
-!HK Switched loop index from receiving_pe to sending_pe
+! Switched loop index from receiving_pe to sending_pe
 ! Aim: to make the commication scale better on Yellowstone, as num_pes >> ens_size
 ! For small numbers of tasks (32 or less) the recieving_pe loop may be faster.
 ! Namelist option use_copy2var_send_loop can be used to select which
 ! communication pattern to use
 !    Default: use sending_pe loop (use_copy2var_send_loop = .true.)
 
-SEND_LOOP: do sending_pe = 0, num_pes - 1
+SENDING_PE_LOOP: do sending_pe = 0, num_pes - 1
+ 
+   if (my_pe /= sending_pe ) then
 
-   if (my_task_id() /= sending_pe ) then
-
       ! figure out what piece to recieve from each other PE and recieve it
       call get_var_list(num_vars, sending_pe, var_list, num_vars_to_receive)
 
-      if (num_vars_to_receive > 0) then
+      if( num_vars_to_receive > 0 ) then
          ! Loop to receive these vars for each copy stored on my_pe
          ALL_MY_COPIES_RECV_LOOP: do k = 1, my_num_copies
 
-            call receive_from(sending_pe, transfer_temp(1:num_vars_to_receive))
-
+            call receive_from(map_pe_to_task(ens_handle, sending_pe), transfer_temp(1:num_vars_to_receive))
             ! Copy the transfer array to my local storage
             do sv = 1, num_vars_to_receive
                ens_handle%vars(var_list(sv), k) = transfer_temp(sv)
             enddo
+
          enddo ALL_MY_COPIES_RECV_LOOP
       endif
 
    else
 
       do recv_pe = 0, num_pes - 1
+      ! I'm the sending PE, figure out what copies of my vars I'll send
+      call get_copy_list(num_copies, recv_pe, copy_list, num_copies_to_send)
 
-         ! I'm the sending PE, figure out what copies of my vars I'll send
-         call get_copy_list(num_copies, recv_pe, copy_list, num_copies_to_send)
-
          SEND_COPIES: do copy = 1, num_copies_to_send
-
-            if (my_task_id() /= recv_pe ) then
-
+            if (my_pe /= recv_pe ) then
                if (my_num_vars > 0) then
                   transfer_temp(1:my_num_vars) = ens_handle%copies(copy_list(copy), :)
                   ! Have to  use temp because %copies section is not contiguous storage
-                  call send_to(recv_pe, transfer_temp(1:my_num_vars))
+                  call send_to(map_pe_to_task(ens_handle, recv_pe), transfer_temp(1:my_num_vars))
                endif
+
             else
+
                ! figure out what piece to recieve from myself and recieve it
                call get_var_list(num_vars, sending_pe, var_list, num_vars_to_receive)
-
                do k = 1,  my_num_copies
                   ! sending to yourself so just copy
                   global_ens_index = ens_handle%my_copies(k)
@@ -1098,9 +1138,10 @@
             endif
          enddo SEND_COPIES
       enddo
+
    endif
 
-enddo SEND_LOOP
+enddo SENDING_PE_LOOP
 
 else ! use old communication pattern
 
@@ -1125,7 +1166,7 @@
             else
                if (num_vars_to_receive > 0) then
                   ! Otherwise, receive this part from the sending pe
-                  call receive_from(sending_pe, transfer_temp(1:num_vars_to_receive)) 
+                  call receive_from(map_pe_to_task(ens_handle, sending_pe), transfer_temp(1:num_vars_to_receive))
    
                   ! Copy the transfer array to my local storage
                   do sv = 1, num_vars_to_receive
@@ -1143,7 +1184,7 @@
          if (my_num_vars > 0) then
             transfer_temp(1:my_num_vars) = ens_handle%copies(copy_list(copy), :)
             ! Have to  use temp because %copies section is not contiguous storage
-            call send_to(recv_pe, transfer_temp(1:my_num_vars))
+            call send_to(map_pe_to_task(ens_handle, recv_pe), transfer_temp(1:my_num_vars))
          endif
       end do
       
@@ -1263,7 +1304,7 @@
 if (should_output) then
    old_flag = do_output()
    call set_output(.true.)
-   write(tbuf, "(A,I4,A)") 'PE', my_pe, ': '//trim(msg)
+   write(tbuf, "(A,I4,A)") 'Task', my_task_id(), ': '//trim(msg)
    call timestamp(trim(tbuf), pos='brief')  ! was debug
    call set_output(old_flag)
 endif
@@ -1272,4 +1313,193 @@
 
 !--------------------------------------------------------------------------------
 
+subroutine assign_tasks_to_pes(ens_handle, nEns_members, layout_type)
+! Calulate the task layout based on the tasks per node and the total number of tasks.
+! Allows the user to spread out the ensemble members as much as possible to balance 
+! memory usage between nodes.
+!
+! Possible options:
+!   1. Standard task layout - first n tasks have the ensemble members my_pe = my_task_id()
+!   2. Round-robin on the nodes
+
+type(ensemble_type), intent(inout)    :: ens_handle
+integer,             intent(in)       :: nEns_members
+integer,             intent(inout)    :: layout_type
+
+if (layout_type /= 1 .and. layout_type /=2) call error_handler(E_ERR,'assign_tasks_to_pes', &
+    'not a valid layout_type, must be 1 (standard) or 2 (round-robin)',source,revision,revdate)
+
+if (nEns_members >= num_pes) then   ! if nEns_members >= task_count() then don't try to spread them out
+   call simple_layout(ens_handle, num_pes)
+   return
+endif
+
+if (tasks_per_node >= num_pes) then ! all tasks are on one node, don't try to spread them out
+   call simple_layout(ens_handle, num_pes)
+   return
+endif
+
+if (layout_type == 1) then 
+   call simple_layout(ens_handle, num_pes)
+else
+   call round_robin(ens_handle)
+endif
+
+end subroutine assign_tasks_to_pes
+
+!------------------------------------------------------------------------------
+subroutine round_robin(ens_handle)
+! Round-robin MPI task layout starting at the first node.  
+! Starting on the first node forces pe 0 = task 0. 
+! The smoother code assumes task 0 has an ensemble member.
+! If you want to break the assumption that pe 0 = task 0, this routine is a good 
+! place to start. Test with the smoother. 
+
+type(ensemble_type), intent(inout)   :: ens_handle
+integer                              :: last_node_task_number, num_nodes
+integer                              :: i, j
+integer, allocatable                 :: count(:)
+
+
+! Find number of nodes and find number of tasks on last node
+call calc_tasks_on_each_node(num_nodes, last_node_task_number)
+
+allocate(count(num_nodes))
+
+count(:) = 1  ! keep track of the pes assigned to each node
+i = 0         ! keep track of the # of pes assigned
+
+do while (i < num_pes)   ! until you run out of processors
+   do j = 1, num_nodes   ! loop around the nodes
+
+      if(j == num_nodes) then  ! special case for the last node - it could have fewer tasks than the other nodes
+         if(count(j) <= last_node_task_number) then
+            ens_handle%task_to_pe_list(tasks_per_node*(j-1) + count(j)) = i
+            count(j) = count(j) + 1
+            i = i + 1
+         endif
+      else
+         if(count(j) <= tasks_per_node) then
+            ens_handle%task_to_pe_list(tasks_per_node*(j-1) + count(j)) = i
+            count(j) = count(j) + 1
+            i = i + 1
+         endif
+      endif
+
+   enddo
+enddo
+
+deallocate(count)
+
+call create_pe_to_task_list(ens_handle)
+
+end subroutine round_robin
+!-------------------------------------------------------------------------------
+subroutine create_pe_to_task_list(ens_handle)
+! Creates the ens_handle%pe_to_task_list
+! ens_handle%task_to_pe_list must have been assigned first, otherwise this 
+! routine will just return nonsense. 
+
+!FIXME set ens_handle%task_to_pe_list to -1 when it is allocated, then test if has been changed
+
+type(ensemble_type), intent(inout)   :: ens_handle
+integer                              :: temp_sort(num_pes), idx(num_pes)
+integer                              :: ii
+
+temp_sort = ens_handle%task_to_pe_list
+call sort_task_list(temp_sort, idx, num_pes)
+
+do ii = 1, num_pes
+   ens_handle%pe_to_task_list(ii) = temp_sort(idx(ii))
+enddo
+
+end subroutine create_pe_to_task_list
+
+!-------------------------------------------------------------------------------
+
+subroutine calc_tasks_on_each_node(nodes, last_node_task_number)
+! Finds the of number nodes and how many tasks are on the last node, given the 
+! number of tasks and the tasks_per_node (ptile).
+! The total number of tasks is num_pes = task_count()
+! The last node may have fewer tasks, for example, if ptile = 16 and the number of
+! mpi tasks = 17
+
+integer, intent(out)  :: last_node_task_number, nodes
+
+if ( mod(num_pes, tasks_per_node) == 0) then
+   nodes = num_pes / tasks_per_node
+   last_node_task_number = tasks_per_node
+else
+   nodes = num_pes / tasks_per_node + 1
+   last_node_task_number = tasks_per_node - (nodes*tasks_per_node - num_pes)
+endif
+
+end subroutine calc_tasks_on_each_node
+
+!-----------------------------------------------------------------------------
+subroutine simple_layout(ens_handle, n)
+! assigns the arrays task_to_pe_list and pe_to_task list for the simple layout
+! where my_pe = my_task_id()
+
+type(ensemble_type), intent(inout) :: ens_handle
+integer,             intent(in)    :: n
+integer                            :: ii
+
+do ii = 0, num_pes - 1
+   ens_handle%task_to_pe_list(ii + 1) = ii
+enddo
+
+ens_handle%pe_to_task_list = ens_handle%task_to_pe_list
+
+end subroutine simple_layout
+
+!------------------------------------------------------------------------------
+subroutine sort_task_list(x, idx, n)
+! sorts an array and returns the sorted array, and the index of the original
+! array
+
+integer, intent(in)    :: n 
+integer, intent(inout) :: x(n)   ! array to be sorted
+integer, intent(out)   :: idx(n) ! index of sorted array
+
+integer                :: xcopy(n), i
+
+xcopy = x
+
+call index_sort(x, idx, n)
+
+do i = 1, n
+   x(i) = xcopy(idx(i))
+enddo
+
+end subroutine sort_task_list
+
+!--------------------------------------------------------------------------------
+function map_pe_to_task(ens_handle, p)
+! Return the physical task for my_pe
+
+type(ensemble_type), intent(in) :: ens_handle
+integer,             intent(in)    :: p
+integer                            :: map_pe_to_task
+
+map_pe_to_task = ens_handle%pe_to_task_list(p + 1)
+
+end function map_pe_to_task
+
+!--------------------------------------------------------------------------------
+function map_task_to_pe(ens_handle, t)
+! Return my_pe corresponding to the physical task
+
+type(ensemble_type), intent(in) :: ens_handle
+integer,             intent(in) :: t
+integer                         :: map_task_to_pe
+
+map_task_to_pe = ens_handle%task_to_pe_list(t + 1)
+
+end function map_task_to_pe
+
+!---------------------------------------------------------------------------------
+
 end module ensemble_manager_mod
+
+

Modified: DART/branches/development/ensemble_manager/ensemble_manager_mod.html
===================================================================
--- DART/branches/development/ensemble_manager/ensemble_manager_mod.html	2013-05-21 21:52:18 UTC (rev 6163)
+++ DART/branches/development/ensemble_manager/ensemble_manager_mod.html	2013-05-21 23:15:46 UTC (rev 6164)
@@ -69,7 +69,8 @@
 <div class=namelist>
 <pre>
 <em class=call>namelist / ensemble_manager_nml / </em>
-   single_restart_file_in, single_restart_file_out, perturbation_amplitude
+   single_restart_file_in, single_restart_file_out, perturbation_amplitude,
+   use_copy2var_send_loop, use_var2copy_rec_loop, layout, tasks_per_node
 </pre>
 </div>
 
@@ -112,8 +113,52 @@
       the initial states into a set of members with enough spread and which
       match the current set of observations.
       Default: 0.2_r8</TD></TR>
+<TR><!--contents--><TD valign=top>use_copy2var_send_loop</TD>
+    <!--  type  --><TD>logical</TD>
+    <!--descript--><TD>MPI communication flag* for all_copies_to_all_vars.
+     Default: .true.</TD></TR>
+<TR><!--contents--><TD valign=top>use_var2copy_rec_loop</TD>
+    <!--  type  --><TD>logical</TD>
+    <!--descript--><TD>MPI communication flag* for all_vars_to_all_copies.
+     Default: .true.</TD></TR>
+ <TR><!--contents--><TD valign=top>layout</TD>
+    <!--  type  --><TD valign=top>integer</TD>
+    <!--descript--><TD>Determines the process (pe) layout on MPI tasks.
+     1 is pe = MPI task. 2 is a round-robin layout around the nodes. Layout 2
+     results in a more even usage of memory across nodes. This may allow you to run
+     with a larger state vector without hitting the memory limit of the node.
+     It may give a slight (5%) increase in performance, but this is machine dependent.
+     It has no effect on serial runs of filter.
+     Default: 1</TD></TR>
+<TR><!--contents--><TD valign=top>tasks_per_node</TD>
+    <!--  type  --><TD>integer</TD>
+    <!--descript--><TD>The number of tasks per node.  This
+    is only used if layout = 2. 
+     Default: 1</TD></TR>
 </TABLE>
 
+<P>

@@ Diff output truncated at 40000 characters. @@


More information about the Dart-dev mailing list