[Dart-dev] [6164] DART/branches/development: Added the ability to choose a round_robin MPI task layout to the ensemble manager (the default is to use the standard layout my_pe = my_task_id).
nancy at ucar.edu
nancy at ucar.edu
Tue May 21 17:15:47 MDT 2013
Revision: 6164
Author: hkershaw
Date: 2013-05-21 17:15:46 -0600 (Tue, 21 May 2013)
Log Message:
Added the ability to choose a round_robin MPI task layout to the ensemble manager (the default is to use the standard layout my_pe = my_task_id). Several other routines have been changed to use my_pe instead of my_task_id. Pe 0 is forced to be task 0 in the round-robin layout. This is a conservative measure for the release to avoid any unforseen problems arising from any code making the assumption that task 0 has a copy. For the future we may want to violate this assumption. filter_state_space_diagnostics was changed to accept the current ensemble time as an arguement. smoother_ss_diagnosics still assumes that task 0 has an ensemble copy. Note that not all files listed in this commit contain code changes, some only contain svn mergeinfo changes. Added documentation to ensemble_manager_mod.html for the layout and for communication loops. Note: we need better names for the communication loop flags.
Modified Paths:
Added Paths:
Property Changed:
-------------- next part --------------
Property changes on: DART/branches/development/adaptive_inflate
Modified: svn:mergeinfo
- /DART/branches/inf_restart:4784-4812
+ /DART/branches/helen/adaptive_inflate:5995-6161
Modified: DART/branches/development/adaptive_inflate/adaptive_inflate_mod.f90
--- DART/branches/development/adaptive_inflate/adaptive_inflate_mod.f90 2013-05-21 21:52:18 UTC (rev 6163)
+++ DART/branches/development/adaptive_inflate/adaptive_inflate_mod.f90 2013-05-21 23:15:46 UTC (rev 6164)
@@ -18,7 +18,8 @@
error_handler, E_ERR, E_MSG
use random_seq_mod, only : random_seq_type, random_gaussian, init_random_seq
use ensemble_manager_mod, only : ensemble_type, all_vars_to_all_copies, all_copies_to_all_vars, &
- read_ensemble_restart, write_ensemble_restart, get_copy_owner_index
+ read_ensemble_restart, write_ensemble_restart, get_copy_owner_index, &
+ map_pe_to_task
use mpi_utilities_mod, only : my_task_id, send_to, receive_from
implicit none
@@ -205,11 +206,11 @@
! a transpose.
if (.not. mean_from_restart) then
call get_copy_owner_index(ss_inflate_index, owner, owners_index)
- if (owner == my_task_id()) ens_handle%vars(:, owners_index) = inf_initial
+ if (owner == ens_handle%my_pe) ens_handle%vars(:, owners_index) = inf_initial
if (.not. sd_from_restart) then
call get_copy_owner_index(ss_inflate_sd_index, owner, owners_index)
- if (owner == my_task_id()) ens_handle%vars(:, owners_index) = sd_initial
+ if (owner == ens_handle%my_pe) ens_handle%vars(:, owners_index) = sd_initial
@@ -230,12 +231,12 @@
! someone else has the inf array. have the owner send the min/max
! values to PE0. after this point only PE0 has the right value
! in minmax_mean, but it is the only one who is going to print below.
- if (my_task_id() == 0) then
- call receive_from(owner, minmax_mean)
- else if (my_task_id() == owner) then
+ if (ens_handle%my_pe == 0) then
+ call receive_from(map_pe_to_task(ens_handle, owner), minmax_mean)
+ else if (ens_handle%my_pe == owner) then
minmax_mean(1) = minval(ens_handle%vars(:, owners_index))
minmax_mean(2) = maxval(ens_handle%vars(:, owners_index))
- call send_to(0, minmax_mean)
+ call send_to(map_pe_to_task(ens_handle, 0), minmax_mean)
@@ -250,12 +251,12 @@
! someone else has the sd array. have the owner send the min/max
! values to PE0. after this point only PE0 has the right value
! in minmax_sd, but it is the only one who is going to print below.
- if (my_task_id() == 0) then
- call receive_from(owner, minmax_sd)
- else if (my_task_id() == owner) then
+ if (ens_handle%my_pe == 0) then
+ call receive_from(map_pe_to_task(ens_handle, owner), minmax_sd)
+ else if (ens_handle%my_pe == owner) then
minmax_sd(1) = minval(ens_handle%vars(:, owners_index))
minmax_sd(2) = maxval(ens_handle%vars(:, owners_index))
- call send_to(0, minmax_sd)
+ call send_to(map_pe_to_task(ens_handle, 0), minmax_sd)
@@ -268,9 +269,9 @@
! ensure the entire array contains a single constant value to match what the code uses.
if(inf_flavor == 3) then
call get_copy_owner_index(ss_inflate_index, owner, owners_index)
- if (owner == my_task_id()) ens_handle%vars(:, owners_index) = ens_handle%vars(1, owners_index)
+ if (owner == ens_handle%my_pe) ens_handle%vars(:, owners_index) = ens_handle%vars(1, owners_index)
call get_copy_owner_index(ss_inflate_sd_index, owner, owners_index)
- if (owner == my_task_id()) ens_handle%vars(:, owners_index) = ens_handle%vars(1, owners_index)
+ if (owner == ens_handle%my_pe) ens_handle%vars(:, owners_index) = ens_handle%vars(1, owners_index)
!------ Block for obs. space inflation initialization ------
Modified: DART/branches/development/assim_tools/assim_tools_mod.f90
--- DART/branches/development/assim_tools/assim_tools_mod.f90 2013-05-21 21:52:18 UTC (rev 6163)
+++ DART/branches/development/assim_tools/assim_tools_mod.f90 2013-05-21 23:15:46 UTC (rev 6164)
@@ -40,7 +40,8 @@
LocationDims, vert_is_surface, has_vertical_localization
use ensemble_manager_mod, only : ensemble_type, get_my_num_vars, get_my_vars, &
- compute_copy_mean_var, get_var_owner_index
+ compute_copy_mean_var, get_var_owner_index, &
+ map_pe_to_task
use mpi_utilities_mod, only : my_task_id, broadcast_send, broadcast_recv, &
@@ -489,7 +490,8 @@
! Following block is done only by the owner of this observation
- if(my_task_id() == owner) then
+ if(ens_handle%my_pe == owner) then
obs_qc = obs_ens_handle%copies(OBS_GLOBAL_QC_COPY, owners_index)
! Only value of 0 for DART QC field should be assimilated
IF_QC_IS_OKAY: if(nint(obs_qc) ==0) then
@@ -551,14 +553,14 @@
!Broadcast the info from this obs to all other processes
! What gets broadcast depends on what kind of inflation is being done
if(local_varying_ss_inflate) then
- call broadcast_send(owner, obs_prior, obs_inc, orig_obs_prior_mean, &
+ call broadcast_send(map_pe_to_task(ens_handle, owner), obs_prior, obs_inc, orig_obs_prior_mean, &
orig_obs_prior_var, net_a, scalar1=obs_qc)
else if(local_single_ss_inflate .or. local_obs_inflate) then
- call broadcast_send(owner, obs_prior, obs_inc, net_a, &
+ call broadcast_send(map_pe_to_task(ens_handle, owner), obs_prior, obs_inc, net_a, &
scalar1=my_inflate, scalar2=my_inflate_sd, scalar3=obs_qc)
- call broadcast_send(owner, obs_prior, obs_inc, net_a, scalar1=obs_qc)
+ call broadcast_send(map_pe_to_task(ens_handle, owner), obs_prior, obs_inc, net_a, scalar1=obs_qc)
! Next block is done by processes that do NOT own this observation
@@ -567,13 +569,13 @@
! I don't store this obs; receive the obs prior and increment from broadcast
! Also get qc and inflation information if needed
if(local_varying_ss_inflate) then
- call broadcast_recv(owner, obs_prior, obs_inc, orig_obs_prior_mean, &
+ call broadcast_recv(map_pe_to_task(ens_handle, owner), obs_prior, obs_inc, orig_obs_prior_mean, &
orig_obs_prior_var, net_a, scalar1=obs_qc)
else if(local_single_ss_inflate .or. local_obs_inflate) then
- call broadcast_recv(owner, obs_prior, obs_inc, net_a, &
+ call broadcast_recv(map_pe_to_task(ens_handle, owner), obs_prior, obs_inc, net_a, &
scalar1=my_inflate, scalar2=my_inflate_sd, scalar3=obs_qc)
- call broadcast_recv(owner, obs_prior, obs_inc, net_a, scalar1=obs_qc)
+ call broadcast_recv(map_pe_to_task(ens_handle, owner), obs_prior, obs_inc, net_a, scalar1=obs_qc)
@@ -651,7 +653,7 @@
rev_num_close_obs = count_close(num_close_obs, close_obs_ind, my_obs_kind, &
close_obs_dist, cutoff_rev*2.0_r8)
! GSR output the new cutoff
! Here is what we might want:
! time, ob index #, ob location, new cutoff, the assimilate obs count, owner (which process has this ob)
Property changes on: DART/branches/development/assim_tools/assim_tools_mod.f90
Modified: svn:mergeinfo
- /DART/releases/Kodiak/assim_tools/assim_tools_mod.f90:5020-5583
+ /DART/branches/helen/assim_tools/assim_tools_mod.f90:5995-6161
Added: DART/branches/development/doc/html/comm_pattern512.png
(Binary files differ)
Property changes on: DART/branches/development/doc/html/comm_pattern512.png
Added: svn:mime-type
+ application/octet-stream
Added: DART/branches/development/doc/html/comm_pattern96.png
(Binary files differ)
Property changes on: DART/branches/development/doc/html/comm_pattern96.png
Added: svn:mime-type
+ application/octet-stream
Modified: DART/branches/development/ensemble_manager/ensemble_manager_mod.f90
--- DART/branches/development/ensemble_manager/ensemble_manager_mod.f90 2013-05-21 21:52:18 UTC (rev 6163)
+++ DART/branches/development/ensemble_manager/ensemble_manager_mod.f90 2013-05-21 23:15:46 UTC (rev 6164)
@@ -29,6 +29,7 @@
use time_manager_mod, only : time_type, set_time
use random_seq_mod, only : random_seq_type, init_random_seq, random_gaussian
use mpi_utilities_mod, only : task_count, my_task_id, send_to, receive_from, task_sync
+use sort_mod, only : index_sort
implicit none
@@ -49,7 +50,8 @@
get_my_vars, compute_copy_mean, compute_copy_mean_sd, &
get_copy, put_copy, all_vars_to_all_copies, &
all_copies_to_all_vars, read_ensemble_restart, write_ensemble_restart, &
- compute_copy_mean_var, get_copy_owner_index, set_ensemble_time
+ compute_copy_mean_var, get_copy_owner_index, set_ensemble_time, &
+ map_task_to_pe, map_pe_to_task
type ensemble_type
@@ -63,6 +65,11 @@
! Time is only related to var complete
type(time_type), pointer :: time(:)
integer :: distribution_type
+ integer, allocatable :: task_to_pe_list(:), pe_to_task_list(:) ! List of tasks
+ ! Flexible my_pe, layout_type which allows different task layouts for different ensemble handles
+ integer :: my_pe
+ integer :: layout_type
end type ensemble_type
! Logical flag for initialization of module
@@ -72,7 +79,7 @@
character(len = 129) :: errstring
! Module storage for pe information for this process avoids recomputation
-integer :: my_pe, num_pes
+integer :: num_pes
@@ -86,15 +93,22 @@
logical :: single_restart_file_out = .true.
! Size of perturbations for creating ensembles when model won't do it
real(r8) :: perturbation_amplitude = 0.2_r8
-! Options to change order of loops in the transposes
+! Options to change order of communication loops in the transpose routines
logical :: use_copy2var_send_loop = .true.
logical :: use_var2copy_rec_loop = .true.
+! task layout options:
+integer :: layout = 1 ! default to my_pe = my_task_id(). Layout2 assumes that the user knows the correct tasks_per_node
+integer :: tasks_per_node = 1 ! default to 1 if the user does not specify a number of tasks per node.
+logical :: debug = .false.
namelist / ensemble_manager_nml / single_restart_file_in, &
single_restart_file_out, &
perturbation_amplitude, &
use_copy2var_send_loop, &
- use_var2copy_rec_loop
+ use_var2copy_rec_loop, &
+ layout, tasks_per_node, &
+ debug
@@ -102,11 +116,12 @@
subroutine init_ensemble_manager(ens_handle, num_copies, &
- num_vars, distribution_type_in)
+ num_vars, distribution_type_in, layout_type)
type(ensemble_type), intent(out) :: ens_handle
integer, intent(in) :: num_copies, num_vars
integer, intent(in), optional :: distribution_type_in
+integer, intent(in), optional :: layout_type
integer :: iunit, io
@@ -136,13 +151,37 @@
! Get mpi information for this process; it's stored in module storage
num_pes = task_count()
- my_pe = my_task_id()
+! Optional layout_type argument to assign how my_pe is related to my_task_id
+! layout_type can be set individually for each ensemble handle. It is not advisable to do this
+! because get_obs_ens assumes that the layout is the same for each ensemble handle.
+if(.not. present(layout_type) ) then
+ ens_handle%layout_type = layout ! namelist option
+ ens_handle%layout_type = layout_type
+! Check for error: only layout_types 1 and 2 are implemented
+if (ens_handle%layout_type /= 1 .and. ens_handle%layout_type /=2 ) then
+ call error_handler(E_ERR, 'init_ensemble_manager', 'only layout values 1 (standard), 2(round-robin) allowed ', source, revision, revdate)
+call assign_tasks_to_pes(ens_handle, num_copies, ens_handle%layout_type)
+ens_handle%my_pe = map_task_to_pe(ens_handle, my_task_id())
! Set the global storage bounds for the number of copies and variables
ens_handle%num_copies = num_copies
ens_handle%num_vars = num_vars
+if(debug .and. my_task_id()==0) then
+ print*, 'pe_to_task_list', ens_handle%pe_to_task_list
+ print*, 'task_to_pe_list', ens_handle%task_to_pe_list
! Figure out how the ensemble copies are partitioned
call set_up_ens_distribution(ens_handle)
@@ -199,15 +238,15 @@
!-------- Block for single restart file or single member being perturbed -----
if(single_restart_file_in .or. .not. start_from_restart .or. &
single_file_override) then
- ! Single restart file is read only by master_pe and then distributed
- if(my_pe == 0) iunit = open_restart_read(file_name)
+ ! Single restart file is read only by task 0 and then distributed
+ if(my_task_id() == 0) iunit = open_restart_read(file_name)
allocate(ens(ens_handle%num_vars)) ! used to be on stack.
! Loop through the total number of copies
do i = start_copy, end_copy
- ! Only master_pe does reading. Everybody can do their own perturbing
- if(my_pe == 0) then
- ! Read restarts in sequence; only read once for not start_from_restart
+ ! Only task 0 does reading. Everybody can do their own perturbing
+ if(my_task_id() == 0) then
+ ! Read restarts in sequence; only read once for not start_from_restart
if(start_from_restart .or. i == start_copy) &
call aread_state_restart(ens_time, ens, iunit)
! Override this time if requested by namelist
@@ -215,15 +254,16 @@
! Store this copy in the appropriate place on the appropriate process
- call put_copy(0, ens_handle, i, ens, ens_time)
+ ! map from my_pe to physical task number all done in send and recieves only
+ call put_copy(map_task_to_pe(ens_handle,0), ens_handle, i, ens, ens_time)
end do
- ! Master pe must close the file it's been reading
- if(my_pe == 0) call close_restart(iunit)
+ ! Task 0 must close the file it's been reading
+ if(my_task_id() == 0) call close_restart(iunit)
!----------- Block that follows is for multiple restart files -----------
! Loop to read in all my ensemble members
READ_MULTIPLE_RESTARTS: do i = 1, ens_handle%my_num_copies
@@ -299,17 +339,18 @@
! Need to force single restart file for inflation files
if(single_restart_file_out .or. single_file_forced) then
- ! Single restart file is written only by the master_pe
- if(my_pe == 0) then
- iunit = open_restart_write(file_name)
+ ! Single restart file is written only by task 0
+ if(my_task_id() == 0) then
+ iunit = open_restart_write(file_name)
! Loop to write each ensemble member
allocate(ens(ens_handle%num_vars)) ! used to be on stack.
do i = start_copy, end_copy
! Figure out where this ensemble member is being stored
call get_copy_owner_index(i, owner, owners_index)
- ! If it's on the master pe, just write it
- if(owner == 0) then
+ ! If it's on task 0, just write it
+ if(map_pe_to_task(ens_handle, owner) == 0) then
call awrite_state_restart(ens_handle%time(owners_index), &
ens_handle%vars(:, owners_index), iunit)
@@ -317,15 +358,13 @@
! This communication assumes index numbers are monotonically increasing
! and that communications is blocking so that there are not multiple
! outstanding messages from the same processors (also see send_to below).
- call receive_from(owner, ens, ens_time)
+ call receive_from(map_pe_to_task(ens_handle, owner), ens, ens_time)
call awrite_state_restart(ens_time, ens, iunit)
end do
call close_restart(iunit)
- else
- ! If I'm not the master pe with a single restart, I must send my copies
- ! to master pe for writing to file
+ else ! I must send my copies to task 0 for writing to file
do i = 1, ens_handle%my_num_copies
! Figure out which global index this is
global_index = ens_handle%my_copies(i)
@@ -393,9 +432,9 @@
call get_copy_owner_index(copy, owner, owners_index)
!----------- Block of code that must be done by receiving pe -----------------------------
-if(my_pe == receiving_pe) then
+if(ens_handle%my_pe == receiving_pe) then
! If PE that stores is the same, just copy and return
- if(my_pe == owner) then
+ if(ens_handle%my_pe == owner) then
vars = ens_handle%vars(:, owners_index)
if(present(mtime)) mtime = ens_handle%time(owners_index)
! If I'm the receiving PE and also the owner, I'm all finished; return
@@ -403,16 +442,17 @@
! Otherwise, must wait to receive vars and time from storing pe
- call receive_from(owner, vars, mtime)
+ call receive_from(map_pe_to_task(ens_handle, owner), vars, mtime)
!----- Block of code that must be done by PE that stores the copy IF it is NOT receiver -----
-if(my_pe == owner) then
+if(ens_handle%my_pe == owner) then
! Send copy to receiving pe
if(present(mtime)) then
- call send_to(receiving_pe, ens_handle%vars(:, owners_index), ens_handle%time(owners_index))
+ call send_to(map_pe_to_task(ens_handle, receiving_pe), ens_handle%vars(:, owners_index), ens_handle%time(owners_index))
- call send_to(receiving_pe, ens_handle%vars(:, owners_index))
+ call send_to(map_pe_to_task(ens_handle, receiving_pe), ens_handle%vars(:, owners_index))
!------ End of block ---------------------------------------------------------------------
@@ -454,9 +494,9 @@
call get_copy_owner_index(copy, owner, owners_index)
! Block of code that must be done by PE that is to send the copy
-if(my_pe == sending_pe) then
+if(ens_handle%my_pe == sending_pe) then
! If PE that stores is the same, just copy and return
- if(my_pe == owner) then
+ if(ens_handle%my_pe == owner) then
ens_handle%vars(:, owners_index) = vars
if(present(mtime)) ens_handle%time(owners_index) = mtime
! If I'm the sending PE and also the owner, I'm all finished; return
@@ -464,16 +504,17 @@
! Otherwise, must send vars and possibly time to storing pe
- call send_to(owner, vars, mtime)
+ call send_to(map_pe_to_task(ens_handle, owner), vars, mtime)
! Block of code that must be done by PE that stores the copy IF it is NOT sender
-if(my_pe == owner) then
+if(ens_handle%my_pe == owner) then
! Need to receive copy from sending_pe
if(present(mtime)) then
- call receive_from(sending_pe, ens_handle%vars(:, owners_index), ens_handle%time(owners_index))
+ call receive_from(map_pe_to_task(ens_handle, sending_pe), ens_handle%vars(:, owners_index), ens_handle%time(owners_index))
- call receive_from(sending_pe, ens_handle%vars(:, owners_index))
+ call receive_from(map_pe_to_task(ens_handle, sending_pe), ens_handle%vars(:, owners_index))
@@ -527,7 +568,7 @@
! Free up the allocated storage
deallocate(ens_handle%my_copies, ens_handle%time, ens_handle%my_vars, &
- ens_handle%vars, ens_handle%copies)
+ ens_handle%vars, ens_handle%copies, ens_handle%task_to_pe_list, ens_handle%pe_to_task_list)
end subroutine end_ensemble_manager
@@ -658,7 +699,7 @@
! Compute the total number of copies I'll get for var complete
num_per_pe_below = ens_handle%num_copies / num_pes
num_left_over = ens_handle%num_copies - num_per_pe_below * num_pes
-if(num_left_over >= (my_pe + 1)) then
+if(num_left_over >= (ens_handle%my_pe + 1)) then
ens_handle%my_num_copies = num_per_pe_below + 1
ens_handle%my_num_copies = num_per_pe_below
@@ -667,7 +708,7 @@
! Do the same thing for copy complete: figure out which vars I get
num_per_pe_below = ens_handle%num_vars / num_pes
num_left_over = ens_handle%num_vars - num_per_pe_below * num_pes
-if(num_left_over >= (my_pe + 1)) then
+if(num_left_over >= (ens_handle%my_pe + 1)) then
ens_handle%my_num_vars = num_per_pe_below + 1
ens_handle%my_num_vars = num_per_pe_below
@@ -685,22 +726,22 @@
ens_handle%copies = MISSING_R8
! Fill out the number of my members
-call get_copy_list(ens_handle%num_copies, my_pe, ens_handle%my_copies, i)
+call get_copy_list(ens_handle%num_copies, ens_handle%my_pe, ens_handle%my_copies, i)
! Initialize times to missing
+! This is only initializing times for pes that have ensemble copies
do i = 1, ens_handle%my_num_copies
ens_handle%time(i) = set_time(0, 0)
end do
! Fill out the number of my vars
-call get_var_list(ens_handle%num_vars, my_pe, ens_handle%my_vars, i)
+call get_var_list(ens_handle%num_vars, ens_handle%my_pe, ens_handle%my_vars, i)
end subroutine set_up_ens_distribution
subroutine get_copy_owner_index(copy_number, owner, owners_index)
-!!!subroutine get_copy_owner_index(copy_number, owner, owners_index, distribution_type)
! Given the copy number, returns which PE stores it when copy complete
! and its index in that pes local storage. Depends on distribution_type
@@ -708,10 +749,10 @@
integer, intent(in) :: copy_number
integer, intent(out) :: owner, owners_index
-!!!integer, intent(in) :: distribution_type
integer :: div
+! Asummes distribution type 1
div = (copy_number - 1) / num_pes
owner = copy_number - div * num_pes - 1
owners_index = div + 1
@@ -721,7 +762,6 @@
subroutine get_var_owner_index(var_number, owner, owners_index)
-!!!subroutine get_var_owner_index(var_number, owner, owners_index, distribution_type)
! Given the var number, returns which PE stores it when var complete
! and its index in that pes local storage. Depends on distribution_type
@@ -729,10 +769,10 @@
integer, intent(in) :: var_number
integer, intent(out) :: owner, owners_index
-!!!integer, intent(in) :: distribution_type
integer :: div
+! Asummes distribution type 1
div = (var_number - 1) / num_pes
owner = var_number - div * num_pes - 1
owners_index = div + 1
@@ -855,7 +895,7 @@
integer, allocatable :: var_list(:), copy_list(:)
real(r8), allocatable :: transfer_temp(:)
-integer :: num_copies, num_vars, my_num_vars, my_num_copies
+integer :: num_copies, num_vars, my_num_vars, my_num_copies, my_pe
integer :: max_num_vars, max_num_copies, num_copies_to_receive
integer :: sending_pe, recv_pe, k, sv, num_vars_to_send, copy
integer :: global_ens_index
@@ -876,6 +916,7 @@
num_vars = ens_handle%num_vars
my_num_vars = ens_handle%my_num_vars
my_num_copies = ens_handle%my_num_copies
+my_pe = ens_handle%my_pe
! What is maximum number of vars stored on a copy complete pe?
max_num_vars = get_max_num_vars(num_vars)
@@ -910,7 +951,7 @@
if (num_copies_to_receive > 0) then
! Otherwise, receive this part from the sending pe
- call receive_from(sending_pe, transfer_temp(1:my_num_vars))
+ call receive_from(map_pe_to_task(ens_handle, sending_pe), transfer_temp(1:my_num_vars))
! Copy the transfer array to my local storage
ens_handle%copies(global_ens_index, :) = transfer_temp(1:my_num_vars)
@@ -927,7 +968,7 @@
! Have to use temp because %var section is not contiguous storage
transfer_temp(sv) = ens_handle%vars(var_list(sv), k)
- call send_to(recv_pe, transfer_temp(1:num_vars_to_send))
+ call send_to(map_pe_to_task(ens_handle, recv_pe), transfer_temp(1:num_vars_to_send))
end do
@@ -958,7 +999,7 @@
ens_handle%copies(global_ens_index, :) = transfer_temp(1:num_vars_to_send)
! Otherwise, ship this off
- call send_to(recv_pe, transfer_temp(1:num_vars_to_send))
+ call send_to(map_pe_to_task(ens_handle, recv_pe), transfer_temp(1:num_vars_to_send))
@@ -971,7 +1012,7 @@
do copy = 1, num_copies_to_receive
if (my_num_vars > 0) then
! Have to use temp because %copies section is not contiguous storage
- call receive_from(sending_pe, transfer_temp(1:my_num_vars))
+ call receive_from(map_pe_to_task(ens_handle, sending_pe), transfer_temp(1:my_num_vars))
! Figure out which global ensemble member this is
global_ens_index = copy_list(copy)
! Store this chunk in my local storage
@@ -1008,7 +1049,7 @@
integer, allocatable :: var_list(:), copy_list(:)
real(r8), allocatable :: transfer_temp(:)
-integer :: num_copies, num_vars, my_num_vars, my_num_copies
+integer :: num_copies, num_vars, my_num_vars, my_num_copies, my_pe
integer :: max_num_vars, max_num_copies, num_vars_to_receive
integer :: sending_pe, recv_pe, k, sv, copy, num_copies_to_send
integer :: global_ens_index
@@ -1029,6 +1070,7 @@
num_vars = ens_handle%num_vars
my_num_vars = ens_handle%my_num_vars
my_num_copies = ens_handle%my_num_copies
+my_pe = ens_handle%my_pe
! What is maximum number of vars stored on a copy complete pe?
max_num_vars = get_max_num_vars(num_vars)
@@ -1041,53 +1083,51 @@
if (use_copy2var_send_loop .eqv. .true. ) then
-!HK Switched loop index from receiving_pe to sending_pe
+! Switched loop index from receiving_pe to sending_pe
! Aim: to make the commication scale better on Yellowstone, as num_pes >> ens_size
! For small numbers of tasks (32 or less) the recieving_pe loop may be faster.
! Namelist option use_copy2var_send_loop can be used to select which
! communication pattern to use
! Default: use sending_pe loop (use_copy2var_send_loop = .true.)
-SEND_LOOP: do sending_pe = 0, num_pes - 1
+SENDING_PE_LOOP: do sending_pe = 0, num_pes - 1
+ if (my_pe /= sending_pe ) then
- if (my_task_id() /= sending_pe ) then
! figure out what piece to recieve from each other PE and recieve it
call get_var_list(num_vars, sending_pe, var_list, num_vars_to_receive)
- if (num_vars_to_receive > 0) then
+ if( num_vars_to_receive > 0 ) then
! Loop to receive these vars for each copy stored on my_pe
ALL_MY_COPIES_RECV_LOOP: do k = 1, my_num_copies
- call receive_from(sending_pe, transfer_temp(1:num_vars_to_receive))
+ call receive_from(map_pe_to_task(ens_handle, sending_pe), transfer_temp(1:num_vars_to_receive))
! Copy the transfer array to my local storage
do sv = 1, num_vars_to_receive
ens_handle%vars(var_list(sv), k) = transfer_temp(sv)
do recv_pe = 0, num_pes - 1
+ ! I'm the sending PE, figure out what copies of my vars I'll send
+ call get_copy_list(num_copies, recv_pe, copy_list, num_copies_to_send)
- ! I'm the sending PE, figure out what copies of my vars I'll send
- call get_copy_list(num_copies, recv_pe, copy_list, num_copies_to_send)
SEND_COPIES: do copy = 1, num_copies_to_send
- if (my_task_id() /= recv_pe ) then
+ if (my_pe /= recv_pe ) then
if (my_num_vars > 0) then
transfer_temp(1:my_num_vars) = ens_handle%copies(copy_list(copy), :)
! Have to use temp because %copies section is not contiguous storage
- call send_to(recv_pe, transfer_temp(1:my_num_vars))
+ call send_to(map_pe_to_task(ens_handle, recv_pe), transfer_temp(1:my_num_vars))
! figure out what piece to recieve from myself and recieve it
call get_var_list(num_vars, sending_pe, var_list, num_vars_to_receive)
do k = 1, my_num_copies
! sending to yourself so just copy
global_ens_index = ens_handle%my_copies(k)
@@ -1098,9 +1138,10 @@
-enddo SEND_LOOP
else ! use old communication pattern
@@ -1125,7 +1166,7 @@
if (num_vars_to_receive > 0) then
! Otherwise, receive this part from the sending pe
- call receive_from(sending_pe, transfer_temp(1:num_vars_to_receive))
+ call receive_from(map_pe_to_task(ens_handle, sending_pe), transfer_temp(1:num_vars_to_receive))
! Copy the transfer array to my local storage
do sv = 1, num_vars_to_receive
@@ -1143,7 +1184,7 @@
if (my_num_vars > 0) then
transfer_temp(1:my_num_vars) = ens_handle%copies(copy_list(copy), :)
! Have to use temp because %copies section is not contiguous storage
- call send_to(recv_pe, transfer_temp(1:my_num_vars))
+ call send_to(map_pe_to_task(ens_handle, recv_pe), transfer_temp(1:my_num_vars))
end do
@@ -1263,7 +1304,7 @@
if (should_output) then
old_flag = do_output()
call set_output(.true.)
- write(tbuf, "(A,I4,A)") 'PE', my_pe, ': '//trim(msg)
+ write(tbuf, "(A,I4,A)") 'Task', my_task_id(), ': '//trim(msg)
call timestamp(trim(tbuf), pos='brief') ! was debug
call set_output(old_flag)
@@ -1272,4 +1313,193 @@
+subroutine assign_tasks_to_pes(ens_handle, nEns_members, layout_type)
+! Calulate the task layout based on the tasks per node and the total number of tasks.
+! Allows the user to spread out the ensemble members as much as possible to balance
+! memory usage between nodes.
+! Possible options:
+! 1. Standard task layout - first n tasks have the ensemble members my_pe = my_task_id()
+! 2. Round-robin on the nodes
+type(ensemble_type), intent(inout) :: ens_handle
+integer, intent(in) :: nEns_members
+integer, intent(inout) :: layout_type
+if (layout_type /= 1 .and. layout_type /=2) call error_handler(E_ERR,'assign_tasks_to_pes', &
+ 'not a valid layout_type, must be 1 (standard) or 2 (round-robin)',source,revision,revdate)
+if (nEns_members >= num_pes) then ! if nEns_members >= task_count() then don't try to spread them out
+ call simple_layout(ens_handle, num_pes)
+ return
+if (tasks_per_node >= num_pes) then ! all tasks are on one node, don't try to spread them out
+ call simple_layout(ens_handle, num_pes)
+ return
+if (layout_type == 1) then
+ call simple_layout(ens_handle, num_pes)
+ call round_robin(ens_handle)
+end subroutine assign_tasks_to_pes
+subroutine round_robin(ens_handle)
+! Round-robin MPI task layout starting at the first node.
+! Starting on the first node forces pe 0 = task 0.
+! The smoother code assumes task 0 has an ensemble member.
+! If you want to break the assumption that pe 0 = task 0, this routine is a good
+! place to start. Test with the smoother.
+type(ensemble_type), intent(inout) :: ens_handle
+integer :: last_node_task_number, num_nodes
+integer :: i, j
+integer, allocatable :: count(:)
+! Find number of nodes and find number of tasks on last node
+call calc_tasks_on_each_node(num_nodes, last_node_task_number)
+count(:) = 1 ! keep track of the pes assigned to each node
+i = 0 ! keep track of the # of pes assigned
+do while (i < num_pes) ! until you run out of processors
+ do j = 1, num_nodes ! loop around the nodes
+ if(j == num_nodes) then ! special case for the last node - it could have fewer tasks than the other nodes
+ if(count(j) <= last_node_task_number) then
+ ens_handle%task_to_pe_list(tasks_per_node*(j-1) + count(j)) = i
+ count(j) = count(j) + 1
+ i = i + 1
+ endif
+ else
+ if(count(j) <= tasks_per_node) then
+ ens_handle%task_to_pe_list(tasks_per_node*(j-1) + count(j)) = i
+ count(j) = count(j) + 1
+ i = i + 1
+ endif
+ endif
+ enddo
+call create_pe_to_task_list(ens_handle)
+end subroutine round_robin
+subroutine create_pe_to_task_list(ens_handle)
+! Creates the ens_handle%pe_to_task_list
+! ens_handle%task_to_pe_list must have been assigned first, otherwise this
+! routine will just return nonsense.
+!FIXME set ens_handle%task_to_pe_list to -1 when it is allocated, then test if has been changed
+type(ensemble_type), intent(inout) :: ens_handle
+integer :: temp_sort(num_pes), idx(num_pes)
+integer :: ii
+temp_sort = ens_handle%task_to_pe_list
+call sort_task_list(temp_sort, idx, num_pes)
+do ii = 1, num_pes
+ ens_handle%pe_to_task_list(ii) = temp_sort(idx(ii))
+end subroutine create_pe_to_task_list
+subroutine calc_tasks_on_each_node(nodes, last_node_task_number)
+! Finds the of number nodes and how many tasks are on the last node, given the
+! number of tasks and the tasks_per_node (ptile).
+! The total number of tasks is num_pes = task_count()
+! The last node may have fewer tasks, for example, if ptile = 16 and the number of
+! mpi tasks = 17
+integer, intent(out) :: last_node_task_number, nodes
+if ( mod(num_pes, tasks_per_node) == 0) then
+ nodes = num_pes / tasks_per_node
+ last_node_task_number = tasks_per_node
+ nodes = num_pes / tasks_per_node + 1
+ last_node_task_number = tasks_per_node - (nodes*tasks_per_node - num_pes)
+end subroutine calc_tasks_on_each_node
+subroutine simple_layout(ens_handle, n)
+! assigns the arrays task_to_pe_list and pe_to_task list for the simple layout
+! where my_pe = my_task_id()
+type(ensemble_type), intent(inout) :: ens_handle
+integer, intent(in) :: n
+integer :: ii
+do ii = 0, num_pes - 1
+ ens_handle%task_to_pe_list(ii + 1) = ii
+ens_handle%pe_to_task_list = ens_handle%task_to_pe_list
+end subroutine simple_layout
+subroutine sort_task_list(x, idx, n)
+! sorts an array and returns the sorted array, and the index of the original
+! array
+integer, intent(in) :: n
+integer, intent(inout) :: x(n) ! array to be sorted
+integer, intent(out) :: idx(n) ! index of sorted array
+integer :: xcopy(n), i
+xcopy = x
+call index_sort(x, idx, n)
+do i = 1, n
+ x(i) = xcopy(idx(i))
+end subroutine sort_task_list
+function map_pe_to_task(ens_handle, p)
+! Return the physical task for my_pe
+type(ensemble_type), intent(in) :: ens_handle
+integer, intent(in) :: p
+integer :: map_pe_to_task
+map_pe_to_task = ens_handle%pe_to_task_list(p + 1)
+end function map_pe_to_task
+function map_task_to_pe(ens_handle, t)
+! Return my_pe corresponding to the physical task
+type(ensemble_type), intent(in) :: ens_handle
+integer, intent(in) :: t
+integer :: map_task_to_pe
+map_task_to_pe = ens_handle%task_to_pe_list(t + 1)
+end function map_task_to_pe
end module ensemble_manager_mod
Modified: DART/branches/development/ensemble_manager/ensemble_manager_mod.html
--- DART/branches/development/ensemble_manager/ensemble_manager_mod.html 2013-05-21 21:52:18 UTC (rev 6163)
+++ DART/branches/development/ensemble_manager/ensemble_manager_mod.html 2013-05-21 23:15:46 UTC (rev 6164)
@@ -69,7 +69,8 @@
<div class=namelist>
<em class=call>namelist / ensemble_manager_nml / </em>
- single_restart_file_in, single_restart_file_out, perturbation_amplitude
+ single_restart_file_in, single_restart_file_out, perturbation_amplitude,
+ use_copy2var_send_loop, use_var2copy_rec_loop, layout, tasks_per_node
@@ -112,8 +113,52 @@
the initial states into a set of members with enough spread and which
match the current set of observations.
Default: 0.2_r8</TD></TR>
+<TR><!--contents--><TD valign=top>use_copy2var_send_loop</TD>
+ <!-- type --><TD>logical</TD>
+ <!--descript--><TD>MPI communication flag* for all_copies_to_all_vars.
+ Default: .true.</TD></TR>
+<TR><!--contents--><TD valign=top>use_var2copy_rec_loop</TD>
+ <!-- type --><TD>logical</TD>
+ <!--descript--><TD>MPI communication flag* for all_vars_to_all_copies.
+ Default: .true.</TD></TR>
+ <TR><!--contents--><TD valign=top>layout</TD>
+ <!-- type --><TD valign=top>integer</TD>
+ <!--descript--><TD>Determines the process (pe) layout on MPI tasks.
+ 1 is pe = MPI task. 2 is a round-robin layout around the nodes. Layout 2
+ results in a more even usage of memory across nodes. This may allow you to run
+ with a larger state vector without hitting the memory limit of the node.
+ It may give a slight (5%) increase in performance, but this is machine dependent.
+ It has no effect on serial runs of filter.
+ Default: 1</TD></TR>
+<TR><!--contents--><TD valign=top>tasks_per_node</TD>
+ <!-- type --><TD>integer</TD>
+ <!--descript--><TD>The number of tasks per node. This
+ is only used if layout = 2.
+ Default: 1</TD></TR>
@@ Diff output truncated at 40000 characters. @@
More information about the Dart-dev
mailing list