Application Development Blog Posts
Learn and share on deeper, cross technology development topics such as integration and connectivity, automation, cloud extensibility, developing at scale, and security.
cancel
Showing results for 
Search instead for 
Did you mean: 
matt
Active Contributor
Inspired by this blog, by @bruno.esperanca, I though I would share a useful, reusable class I developed for making parallel processing simple, by abstracting away and encapsulating all the technical stuff.

 
CLASS zcl_thread_handler DEFINITION
PUBLIC
FINAL
CREATE PUBLIC .

PUBLIC SECTION.
TYPE-POOLS abap .

CONSTANTS:
c_default_group TYPE rzlli_apcl VALUE 'parallel_generators', "#EC NOTEXT
c_task TYPE char6 VALUE 'PARALL'. "#EC NOTEXT

METHODS:
all_threads_are_finished
RETURNING
VALUE(r_empty) TYPE abap_bool,
clear_thread
IMPORTING
!i_task TYPE char8,
constructor
IMPORTING
!i_task_prefix TYPE char6 DEFAULT c_task
!i_threads TYPE i
!i_group TYPE rzlli_apcl DEFAULT c_default_group,
handle_resource_failure,
get_free_thread
RETURNING
VALUE(r_thread) TYPE char8 .

PROTECTED SECTION.

PRIVATE SECTION.

TYPES:
BEGIN OF ty_thread,
thread TYPE char8,
used TYPE abap_bool,
END OF ty_thread .

DATA:
task_prefix TYPE char6,
threads_list TYPE TABLE OF ty_thread WITH DEFAULT KEY,
threads TYPE i,
used_threads TYPE i,
group TYPE rzlli_apcl.

METHODS get_free_threads
RETURNING
VALUE(r_free_threads) TYPE i .
ENDCLASS.



CLASS zcl_thread_handler IMPLEMENTATION.

METHOD get_free_threads.
" Get number of free threads
CALL FUNCTION 'SPBT_INITIALIZE'
EXPORTING
group_name = me->group
IMPORTING
free_pbt_wps = r_free_threads
EXCEPTIONS
invalid_group_name = 1
internal_error = 2
pbt_env_already_initialized = 3
currently_no_resources_avail = 4
no_pbt_resources_found = 5
cant_init_different_pbt_groups = 6
OTHERS = 7.

CASE sy-subrc.
WHEN 0. " Do nothing

WHEN 3.
" Already initialised - get current number of free threads
CALL FUNCTION 'SPBT_GET_CURR_RESOURCE_INFO'
IMPORTING
free_pbt_wps = r_free_threads
EXCEPTIONS
internal_error = 1
pbt_env_not_initialized_yet = 2
OTHERS = 3.

IF sy-subrc IS NOT INITIAL.
" Something has gone seriously wrong, so end it here.
MESSAGE ID sy-msgid TYPE 'X' NUMBER sy-msgno WITH sy-msgv1 sy-msgv2 sy-msgv3 sy-msgv4.
ENDIF.

WHEN OTHERS.
" Something has gone seriously wrong, so end it here.
MESSAGE ID sy-msgid TYPE 'X' NUMBER sy-msgno WITH sy-msgv1 sy-msgv2 sy-msgv3 sy-msgv4.

ENDCASE.
ENDMETHOD.

METHOD all_threads_are_finished.
r_empty = xsdbool( used_threads EQ 0 ).
ENDMETHOD.

METHOD clear_thread.
READ TABLE me->threads_list WITH KEY used = abap_true thread = i_task
ASSIGNING FIELD-SYMBOL(<thread>).
<thread>-used = abap_false.
SUBTRACT 1 FROM used_threads.
ENDMETHOD.


METHOD constructor.
me->group = i_group.
me->task_prefix = i_task_prefix.

" No more than 100 threads
IF i_threads GT 100.
me->threads = 100.
ELSEIF i_threads LE 0.
me->threads = 1.
ELSE.
me->threads = i_threads.
ENDIF.

DATA(free_threads) = me->get_free_threads( ).

" Ensure that no more than half of the free threads are used
free_threads = free_threads / 2 + 1.
IF free_threads LT me->threads.
me->threads = free_threads.
ENDIF.

" Initialise threads
DO me->threads TIMES.
DATA threadn TYPE n LENGTH 2 VALUE '00'.
INSERT VALUE #( thread = me->task_prefix && threadn used = abap_false )
INTO TABLE me->threads_list.
ADD 1 TO threadn.
ENDDO.
ENDMETHOD.

METHOD handle_resource_failure.
DATA(free_threads) = me->get_free_threads( ).
IF free_threads LE 1 AND me->threads GT 1.
SUBTRACT 1 FROM me->threads.
ENDIF.

WAIT UP TO 5 SECONDS. " Long enough for the system to update
WAIT UNTIL me->used_threads LT me->threads. " Now there's an available thread
ENDMETHOD.

METHOD get_free_thread.
" Wait for a free thread
WAIT UNTIL me->used_threads LT me->threads.

" Get number of first free thread
READ TABLE me->threads_list WITH KEY used = abap_false ASSIGNING field-symbol(<thread>).

ADD 1 TO used_threads.
<thread>-used = abap_true.
r_thread = <thread>-thread.
ENDMETHOD.

ENDCLASS.

To use it, instantiate with a prefix to use for the task id, and the ideal number of threads you want to run in parallel. (The code automatically limits it to no more than half of the available thread on the appserver).

Start your loop that contains the call to the logic that you want to run in parallel. E.g.
LOOP AT ...
me->do_stuff_in_parallel( ).
ENDLOOP.

The first statement after the ENDLOOP should be
WAIT UNTIL me->handler->all_threads_are_finished( ).

In your do_stuff_in_parallel method, you have the call to the function module that does the work.
    DATA(thread) = me->handler->get_free_thread( ).

DATA errmsg TYPE char255.
CALL FUNCTION 'Z_...' STARTING NEW TASK thread
DESTINATION IN GROUP zcl_thread_handler=>c_default_group
CALLING on_end_of_action ON END OF TASK
EXPORTING
...
EXCEPTIONS
communication_failure = 1 MESSAGE errmsg
system_failure = 2 MESSAGE errmsg
resource_failure = 3.

Finally, in method on_end_of_action (which has a single importing parameter p_task type clike) in your main application, you receive the results
...
DATA errmsg TYPE c LENGTH 255.
RECEIVE RESULTS FROM FUNCTION 'Z_...'
IMPORTING
...
EXCEPTIONS communication_failure = 1 MESSAGE errmsg
system_failure = 2 MESSAGE errmsg.

IF sy-subrc IS NOT INITIAL.
...handle error
ENDIF.

" Free the thread for the next thread to run
me->clear_thread( CONV char8( p_task ) ).

...handle receive logic

The task for writing ABAP Unit Tests is left to the reader!

Edit:

After the CALL FUNCTION, you can use the handle_resource_failure method. This will cause a wait loop until there are enough resources again. I used this when I'd written a BW extractor, and the person who set up the process chains to run the extractor was running it like twenty times in parallel, with each of those requesting 20 slots.

As a result, although initially there were enough free processes, they rapidly ran out. It is an issue if you have many programs using parallel processing at the same time, because the different programs don't communicate with each other at all - assuming they have the whole system to themselves.

It wouldn't take much effort to address this design issue. However, I only encountered it once, and we cut the parallelism built in the the process chain, and let the extractor program handle all the parallelism.
32 Comments
Florian
Active Contributor
Hi Matthew,

thank you for sharing. I have also something similar in my pocket and will now compare those together.

~Florian
shais
Participant

A good opportunity to mention Parallel ABAP Objects.

 

sitakant_tripathy2
Active Participant
Hi Matthew,

look at below mentioned customizing..

Financial Accounting(New)->Contracts Account receivable and payable->Technical Settings->Prepare Mass Activities.

This is a mass run framework used heavily within utilities. Unfortunately not many people know how to effectively customize it for your own requirements..

Regards,

Sitakant.
matt
Active Contributor
0 Kudos
But not suitable for every SAP solution, I think. Only ERP?
sitakant_tripathy2
Active Participant
True, thats only available in ECC.

Comment was for info only in case u wanted to take some cues. Am sure there must be other similar functions scattered across different product line which people might not be aware of but can be brilliant from ideation point of view.

Regards,

Sitakant.
matt
Active Contributor
0 Kudos
You should write a blog about it!
bruno_esperanca
Contributor
Thanks for the mention, matthew.billingham.

I was unaware that "on_end_of_action" could be a method. That's nice, I'll try it next (life) time.

Cheers!
bruno_esperanca
Contributor
0 Kudos
Hey Matthew,

I had a closer look at the class, please let me know if my understanding is correct. Let's imagine execution starts at a time when the system is particularly overloaded and, for example, only 2 threads are available.

I am correct to assume that, even if many more threads become available, this implementation will continue using a very small number of threads until the job is done?

Thanks,
Bruno
matt
Active Contributor
Correct.

But it's easy enough to adjust so that it dynamically raises/lowers the limit.
former_member557751
Participant
0 Kudos
Hello,

I think I already mentioned it somewhere else, but there is already a nice easy to use "Framework" available in standard for parallel processing (for a very long time). Please check function module SPTA_PARA_PROCESS_START_2 . Check package SPTA  Also check demo reports SPTA_PARA*

I had also a quite detailed documentation for it, but can't find it at the moment 😞

Also this blog post might be helpfull.

https://blogs.sap.com/2014/04/28/two-different-types-of-parallel-processing-examples/

bgRFC framework is SAP recommendation for RFC parallel processing . It is very powerful and has a very nice Object-Oriented API. 

https://help.sap.com/viewer/753088fc00704d0a80e7fbd6803c8adb/7.5.7/en-US/48927c2caa6b17cee10000000a4...

This link might be useful as well.

https://de.scribd.com/document/349472871/bgRFC

And if you would like to do some really new fancy stuff, you can do parallel processing with message channels.

Happy coding, Tapio 😉
matt
Active Contributor
0 Kudos
There is a very strong reason why I would not use SPTA_PARA_PROCESS_START_2. It relies on callbacks to FORMs. I made that comment in the blog you linked to:

  1. FORMs are now obsolete and shouldn't be used in new developments

  2. The FM can't be used in classes, so cannot be part of an ABAP Objects development.


As the blog author abhijit.mandal2 points out the FM itself is written in a fairly OO manner. It seems a bit strange, therefore, that the implementation of the FM must be procedural.

former_member557751
Participant
0 Kudos
Hmm, when I do a Where-Used-List for FM SPTA_PARA_PROCESS_START_2, there are a lot of classes which uses this FM. At least in my Ehp8 system. Might be different in S4 system.

Anyway, I just wanted to mention it for reference. If I had to develop something new I think I would use bgRFC.

Best regards, Tapio
matt
Active Contributor

You can call SPTA_PARA_PROCESS_START_2 from within a class. But the callback is to a FORM.

CALL FUNCTION 'SPTA_PARA_PROCESS_START_2'
EXPORTING
server_group = rfcgroup
max_no_of_tasks = maxtasks
before_rfc_callback_form = 'BEFORE_RFC'
in_rfc_callback_form = 'IN_RFC'
after_rfc_callback_form = 'AFTER_RFC'
callback_prog = gd_repid
EXCEPTIONS
invalid_server_group = 1
no_resources_available = 2
OTHERS = 3.

I looked into the code to see if you could specify a class in callback_prog with methods in the …FORM parameters. And you can’t. It’s all

PERFORM (callback-form) IN PROGRAM (callback-prog)

There are no dynamic method calls. It looks, however, that it would be trivial to put them in… Shame it’s not been done.

 

bruno_esperanca
Contributor
0 Kudos
Thank you. I just wanted to confirm that I was understanding how the class works correctly.
Sandra_Rossi
Active Contributor
typo ALL -> CALL
matt
Active Contributor
0 Kudos
No. I really meant "It's all..."
Sandra_Rossi
Active Contributor
0 Kudos
ALL FUNCTION 'SPTA_PARA_PROCESS_START_2'
matt
Active Contributor
Ah - there. Thank-you! 🙂
0 Kudos
Hi,

 

I am using parallel processing method using call function <> starting new task, My test file has 342K lines, My program is able to update only 339K lines, without any omissions, But Last 2500 lines are not being updated. Initially, I got some memory issues on an Internal table,which  I have recitfied it(Roll Memory issue), But the job i am scheduling is not completing, Can you please let me know what could the reason.
Abinathsiva
Active Contributor
0 Kudos
Nice info, presented neatly thanks for sharing
mateuszadamus
Active Contributor
Hi Matthew,

Nice job. It's always good to have the business logic separated.

I've worked with something similar, but got rid of all the functions with logic executed in parallel and replaced them with object oriented approach. Basically, there's one Z function which is hardcoded in the parallel processing manager, this function then receives class name and data to be processed. An object is created from the class name which is then used to process the data.

A developer is only required to implement the processing class (with a specific interface) and then call the parallel processing manager with the object of the class and the data for processing.

 

regards,

Mateusz

PS: I know it's an old blog post. 😉
matt
Active Contributor
0 Kudos
I did do that in one place, where I had to write a bunch of bw extractors, all needing parallel processing. Implement an instance of an interface with one method to do the processing. (Select(s), data manipulation). The class in this blog was part of that framework.
satrianisg
Member
0 Kudos
Hi there Matthew! Excellent info!. I have one doubt, how would you use the handle_resource_failure method?
0 Kudos

Hi everyone!

@mateuszadamus What you are writing about is very similar to what I did 🙂 And I really like the result. Could you take a look and get any comments or suggestions? This will help my project a lot.

https://github.com/victorizbitskiy/zconcurrency_api

@matthew.billingham Thanks for the good article!

matt
Active Contributor
0 Kudos
I've explained in the text now.
Keith_Warnock
Advisor
Advisor
Hi matthew.billingham

I am running through your example and I run into an issue when trying to put this code into my program?

It does not recognize 'Handler' as part of the class? 'Field "Handler" is unknown'

I have tried linking it to an event but then it removes the returning variable?

What could be missing?
WAIT UNTIL me->handler->all_threads_are_finished( ).

Best Regards
Keith_Warnock
Advisor
Advisor
I removed the 'handler' piece from the code and it seems to work.

Thanks for contribution with this example.
matt
Active Contributor
0 Kudos
Thanks for the heads up. I've updated the blog.

Funny thing, I stopped using me-> just after posting this blog.
debuggertr1
Explorer
0 Kudos
Hi Matthew

how can i create 'Parallel_generators' as different than Empty one. Cause in Function if we will set this value initial than system is giving us all threads. So how can i create separated.
c_default_group TYPE rzlli_apcl VALUE 'parallel_generators', 
matt
Active Contributor
0 Kudos

I haven't set that up. When I developed this, we had a specific group for parallel processing. There are many ways of achieving what you want. I seem to recall when I applied this in another system, I created a setter method to set the group.

debuggertr1
Explorer
0 Kudos
cause i coulndt split parallel_generators, I used default one but even i used half of the emtpy threads, if some one will use the class and start to use other threads I had some dump. Cause there is no empty thread.
matt
Active Contributor
0 Kudos
I think you need to talk to your basis team about setting up groups.

As far as the dumps are concerned - yes, with this implementation that can happen. The way around it is to check for slot availability before attempting to use it.