[Dart-dev] [4512] DART/trunk/ensemble_manager/ensemble_manager_mod.f90: Change all_vars_to_all_copies() so it has one task receiving

nancy at ucar.edu nancy at ucar.edu
Thu Oct 7 15:44:00 MDT 2010


Revision: 4512
Author:   nancy
Date:     2010-10-07 15:43:59 -0600 (Thu, 07 Oct 2010)
Log Message:
-----------
Change all_vars_to_all_copies() so it has one task receiving
round robin and all others sending concurrently.  This turns
out to be significantly faster with large state vectors than
the previous 'one rr sender, concurrent receivers'.  All the
communications use synchronous send/recv calls.  Presumably 
the sends are executing concurrently and the data has been
copied to local memory already so the receiving task doesn't 
have as much latency when it makes the receive call.  The
converse algorithm requires all receivers to wait in turn
as a single sender handshakes with each of the other tasks.

Modified Paths:
--------------
    DART/trunk/ensemble_manager/ensemble_manager_mod.f90

-------------- next part --------------
Modified: DART/trunk/ensemble_manager/ensemble_manager_mod.f90
===================================================================
--- DART/trunk/ensemble_manager/ensemble_manager_mod.f90	2010-09-30 21:54:21 UTC (rev 4511)
+++ DART/trunk/ensemble_manager/ensemble_manager_mod.f90	2010-10-07 21:43:59 UTC (rev 4512)
@@ -22,13 +22,13 @@
 use utilities_mod,     only : register_module, do_nml_file, do_nml_term, &
                               error_handler, E_ERR, E_MSG, do_output, &
                               nmlfileunit, find_namelist_in_file,        &
-                              check_namelist_read
+                              check_namelist_read, timestamp, set_output
 use assim_model_mod,   only : aread_state_restart, awrite_state_restart, &
                               open_restart_read, open_restart_write,     &
                               close_restart, pert_model_state
 use time_manager_mod,  only : time_type, set_time
 use random_seq_mod,    only : random_seq_type, init_random_seq, random_gaussian
-use mpi_utilities_mod, only : task_count, my_task_id, send_to, receive_from
+use mpi_utilities_mod, only : task_count, my_task_id, send_to, receive_from, task_sync
 
 implicit none
 private
@@ -815,20 +815,30 @@
 
 !-----------------------------------------------------------------
 
-subroutine all_vars_to_all_copies(ens_handle)
+subroutine all_vars_to_all_copies(ens_handle, label)
 
 ! Converts from having subset of copies of all variables to having
 ! all copies of a subset of variables on a given PE.
+!
+! updated version of the original routine.  here, all tasks send
+! while 1 receives.  original was all tasks receiving while 1 sends.
+! apparently the sends can overlap and the execution time is less.
 
-type (ensemble_type), intent(inout) :: ens_handle
+type (ensemble_type), intent(inout)        :: ens_handle
+character (len=*),    intent(in), optional :: label
 
 integer,  allocatable :: var_list(:), copy_list(:)
 real(r8), allocatable :: transfer_temp(:)
 integer               :: num_copies, num_vars, my_num_vars, my_num_copies
 integer               :: max_num_vars, max_num_copies, num_copies_to_receive
-integer               :: sending_pe, recv_pe, k, sv, copy, num_vars_to_send
+integer               :: sending_pe, recv_pe, k, sv, var, num_vars_to_send
 integer               :: global_ens_index
 
+! for now, only output if there is a label
+if (present(label)) then
+   call timestamp_message('vars_to_copies start: '//label, alltasks=.true.)
+endif
+
 ! Accelerated version for single process
 if(num_pes == 1) then
    ens_handle%copies = transpose(ens_handle%vars)
@@ -847,69 +857,74 @@
 ! What is maximum number of copies stored on a var complete pe?
 max_num_copies = get_max_num_copies(num_copies)
 
-! Allocate storage to hold lists of vars and copies
 allocate(var_list(max_num_vars), transfer_temp(max_num_vars), &
    copy_list(max_num_copies))
 
-! Loop to give each pe a turn to send its vars
-SENDING_PE_LOOP: do sending_pe = 0, num_pes - 1
-   ! If I'm the sending pe, do this block
-   if(my_pe == sending_pe) then
-      ! Figure out what piece to send to each other PE and send it
-      SEND_TO_EACH: do recv_pe = 0, num_pes - 1
-         call get_var_list(num_vars, recv_pe, var_list, num_vars_to_send)
+! Loop to give each pe a turn to receive its copies
+RECEIVING_PE_LOOP: do recv_pe = 0, num_pes - 1
+   ! If I'm the receiving pe, do this block
+   if(my_pe == recv_pe) then
 
-         if (num_vars_to_send > 0) then
-            ! Loop to send these vars for each copy stored on my_pe
-            ALL_MY_COPIES: do k = 1, my_num_copies
-   
-               ! Fill up the transfer array
-               do sv = 1, num_vars_to_send
-                  transfer_temp(sv) = ens_handle%vars(var_list(sv), k)
+      ! Figure out what piece to receive from each other PE and receive it
+      RECEIVE_FROM_EACH: do sending_pe = 0, num_pes - 1
+         call get_copy_list(num_copies, sending_pe, copy_list, num_copies_to_receive)
+
+         ! Loop to receive for each copy stored on my_pe
+         ALL_MY_COPIES: do k = 1, num_copies_to_receive 
+
+            global_ens_index = copy_list(k)
+
+            ! If sending_pe is receiving_pe, just copy
+            if(sending_pe == recv_pe) then
+               do sv = 1, my_num_vars
+                  ens_handle%copies(global_ens_index, sv) = ens_handle%vars(ens_handle%my_vars(sv), k)
                end do
+            else
+               if (num_copies_to_receive > 0) then
+                  ! Otherwise, receive this part from the sending pe
+                  call receive_from(sending_pe, transfer_temp(1:my_num_vars)) 
    
-               ! If sending_pe is receiving_pe, just copy
-               if(sending_pe == recv_pe) then
-                  global_ens_index = ens_handle%my_copies(k)
-                  ens_handle%copies(global_ens_index, :) = transfer_temp(1:num_vars_to_send)
-               else
-                  ! Otherwise, ship this off
-                  call send_to(recv_pe, transfer_temp(1:num_vars_to_send)) 
+                  ! Copy the transfer array to my local storage
+                  ens_handle%copies(global_ens_index, :) = transfer_temp(1:my_num_vars)
                endif
-            end do ALL_MY_COPIES
-         endif
-      end do SEND_TO_EACH
+            endif
+         end do ALL_MY_COPIES
+      end do RECEIVE_FROM_EACH
    else
-      ! I'm not the sending PE, figure out what copies of my vars I'll receive from sending_pe
-      call get_copy_list(num_copies, sending_pe, copy_list, num_copies_to_receive)
+      ! I'm the sending PE, figure out what vars of my copies I'll send.
+      call get_var_list(num_vars, recv_pe, var_list, num_vars_to_send)
        
-      do copy = 1, num_copies_to_receive
-         if (my_num_vars > 0) then
-            ! Have to  use temp because %copies section is not contiguous storage
-            call receive_from(sending_pe, transfer_temp(1:my_num_vars))
-            ! Figure out which global ensemble member this is
-            global_ens_index = copy_list(copy)
-            ! Store this chunk in my local storage
-            ens_handle%copies(global_ens_index, :) = transfer_temp(1:my_num_vars)
-         endif
+      do k = 1, my_num_copies
+         do sv = 1, num_vars_to_send
+            ! Have to use temp because %var section is not contiguous storage
+            transfer_temp(sv) = ens_handle%vars(var_list(sv), k)
+         enddo
+         call send_to(recv_pe, transfer_temp(1:num_vars_to_send))
       end do
       
    endif
-end do SENDING_PE_LOOP
+end do RECEIVING_PE_LOOP
 
+
 ! Free up the temporary storage
 deallocate(var_list, transfer_temp, copy_list)
 
+! for now, only output if there is a label
+if (present(label)) then
+   call timestamp_message('vars_to_copies   end: '//label, alltasks=.true.)
+endif
+
 end subroutine all_vars_to_all_copies
 
 !-----------------------------------------------------------------
 
-subroutine all_copies_to_all_vars(ens_handle)
+subroutine all_copies_to_all_vars(ens_handle, label)
 
 ! Converts from having subset of copies of all variables to having
 ! all copies of a subset of variables on a given PE.
 
 type (ensemble_type), intent(inout) :: ens_handle
+character (len=*),    intent(in), optional :: label
 
 integer,  allocatable :: var_list(:), copy_list(:)
 real(r8), allocatable :: transfer_temp(:)
@@ -917,7 +932,13 @@
 integer               :: max_num_vars, max_num_copies, num_vars_to_receive
 integer               :: sending_pe, recv_pe, k, sv, copy, num_copies_to_send
 integer               :: global_ens_index
+logical :: oldval
 
+! for now, only output if there is a label
+if (present(label)) then
+   call timestamp_message('copies_to_vars start: '//label, alltasks=.true.)
+endif
+
 ! Accelerated version for single process
 if(num_pes == 1) then
    ens_handle%vars = transpose(ens_handle%copies)
@@ -943,6 +964,7 @@
 RECEIVING_PE_LOOP: do recv_pe = 0, num_pes - 1
    ! If I'm the receiving pe, do this block
    if(my_pe == recv_pe) then
+
       ! Figure out what piece to receive from each other PE and receive it
       RECEIVE_FROM_EACH: do sending_pe = 0, num_pes - 1
          call get_var_list(num_vars, sending_pe, var_list, num_vars_to_receive)
@@ -987,6 +1009,11 @@
 ! Free up the temporary storage
 deallocate(var_list, transfer_temp, copy_list)
 
+! for now, only output if there is a label
+if (present(label)) then
+   call timestamp_message('copies_to_vars   end: '//label, alltasks=.true.)
+endif
+
 end subroutine all_copies_to_all_vars
 
 !-----------------------------------------------------------------
@@ -1063,4 +1090,40 @@
 
 !--------------------------------------------------------------------------------
 
+subroutine timestamp_message(msg, sync, alltasks)
+
+character(len=*), intent(in) :: msg
+logical, intent(in), optional :: sync
+logical, intent(in), optional :: alltasks
+
+integer, save :: timestamp_level = 1
+logical :: should_output, old_flag
+character (len=129) :: tbuf
+
+! Write current time and message to stdout and log file.
+! if sync is present and true, sync mpi jobs before printing time.
+
+if (timestamp_level <= 0) return
+
+if (present(sync)) then
+  if (sync) call task_sync()
+endif
+
+should_output = do_output()
+if (present(alltasks)) then
+   if (alltasks) should_output = .true.
+endif
+
+if (should_output) then
+   old_flag = do_output()
+   call set_output(.true.)
+   write(tbuf, "(A,I4,A)") 'PE', my_pe, ': '//trim(msg)
+   call timestamp(trim(tbuf), pos='brief')  ! was debug
+   call set_output(old_flag)
+endif
+
+end subroutine timestamp_message
+
+!--------------------------------------------------------------------------------
+
 end module ensemble_manager_mod


More information about the Dart-dev mailing list