Written by

UKK Köln
Question Dmitrii Baranov · Jul 15, 2025

Fire-and-forget async/background non-blocking tasks

I need to implement a retry policy for an incoming message queue containing thousands of relatively small messages.

Successfully processed messages should be immediately removed from the queue.

If an error occurs while processing a message, the message should be sent back at the end of the queue, and the pause before re-processing this message should increase geometrically (1-2-4-8-16 seconds, and so on). In languages that support the async/await pattern, I'd simply create a delayed timer that triggers a fire-and-forget task. This would prevent blocking the main thread. How can this be implemented in IRIS?

Product version: IRIS 2025.1

Comments

Timo Lindenschmid · Jul 15, 2025

Hi Dimitrii,

There are various options here. You can use $job to start new process then continue on with your main process or you can use WorkQueueManager to create a workqueue and feed it with items to process.

Best Regards

Timo

0
Dmitrii Baranov  Jul 16, 2025 to Timo Lindenschmid

Hi Timo,

Yesterday I experimented a lot with both Python and workers (and failed).

Here is an example. In this code I want the workers to be started as background tasks, so the program should first print "This should be printed first", then each worker (each started with random delay interval) shoud print its own message. This obviously doesn't happen because of the queue.Sync() call but I don't want to wait for all the workers to complete.

Class DelayedTest Extends %RegisteredObject
{

ClassMethod Callback(interval As %String) As %Status
{
	Hang interval
	Write "Interval = ", interval, !
	Return $$$OK
}

Method RunWorkers()
{
	#Dim queue as %SYSTEM.WorkMgr
	Set queue = ##class(%SYSTEM.WorkMgr).%New()
	For i = 1:1:5
	{
		Set status = queue.Queue("..Callback", $RANDOM(5) + 1) // Minimal delay is 1 second
		$$$ThrowOnError(status)
	}
	
	Set status = queue.Sync()
	$$$ThrowOnError(status)
}

ClassMethod Main()
{
	#Dim d = ##class(DelayedTest).%New()
	Do d.RunWorkers() 
	Write "This should be printed first" 
}

}

Also, I'm not sure that Hang is appropriate here to emulate some delay. If it works like Sleep it should block the main thread.

0
Alexey Maslov  Jul 16, 2025 to Dmitrii Baranov

Hi Dmitrii,

Why not Sync from main()? 

Class User.DelayedTest Extends%RegisteredObject
{

ClassMethod Callback(interval As%String) As%Status
{
    Hang interval
    Write"Interval = ", interval, !
    Return$$$OK
}

Method RunWorkers(queue)
{
    #Dim queue as%SYSTEM.WorkMgrSet queue = ##class(%SYSTEM.WorkMgr).%New()
    For i = 1:1:5
    {
        Set status = queue.Queue("..Callback", $RANDOM(5) + 1) // Minimal delay is 1 second$$$ThrowOnError(status)
    }
}

ClassMethod Main()
{
    #Dimd = ##class(DelayedTest).%New()
    Dod.RunWorkers(.queue)
    Write"This should be printed first",!
    Set status = queue.Sync()
    $$$ThrowOnError(status)
    Write"Exiting...",!
}

}
0
Dmitrii Baranov  Jul 16, 2025 to Alexey Maslov

Hi Alexey, that's the point! I expect the following sequence:

- the worker is created in background and waits for, say, 5 seconds. The main thread isn't blocked!

- the program prints "This should be printed first"

- the waiting interval of 5 seconds expires and the worker emits its own message

0
Tani Frankel · Jul 16, 2025

Hi Dmitrii,

From the discussion in the comments it seems like you're trying to use "Workers", but did you try to use the Event mechanism for your use-case?

See also this related article (and this one as well).

By the way here's a Docs reference of using this from Python.

I don't have a wider context of this, but in the IRIS Interoperability functionality, behind the scenes of the Interoperability components, and the Messages and Queues managed there, this Event mechanism is used. So perhaps if you are already using IRIS's Interoperability capabilities, you can implement this in the higher level of the Business Components in your Interoperability Production, rather than with the lower level code using the Event class.

The Workers mechanism you tried to use is intended more for distributing parallel work, and the Event API is more for messaging and queuing scenarios, which sounds more like your use-case.

This article might also be of interest to you as it discusses moving from "Workers" to "Events".

0
Dmitrii Baranov  Jul 16, 2025 to Tani Frankel

Hi Tani and thanks a lot for the info! To avoid struggling with interprocess communication, I experimented a little more and ended up with Workers and Python (see below). I'd appreciate your opinion

0
Dmitrii Baranov · Jul 16, 2025

I finally managed to solve the problem in Python. It's not perfect but it works:

Class User.Timer Extends %RegisteredObject
{

Property Executor [ Private ];

Method Initialize(maxWorkers As %Integer = 4) [ Language = python ]
{
import concurrent.futures
import time
import threading

self.Executor = concurrent.futures.ThreadPoolExecutor(max_workers=maxWorkers)
}

Method Close() [ Language = python ]
{
if self.Executor:
	self.Executor.shutdown()
}

Method Greet(name)
{
	Write "Hello ", name, !
}

Method OnCallback0(methodName As %String) [ Private ]
{
	Do $METHOD(instance, methodName)
}

Method OnCallback1(instance As %RegisteredObject, method As %String, arg1) [ Private ]
{
	Do $METHOD(instance, method, arg1)
}

Method OnCallback2(instance As %RegisteredObject, method As %String, arg1, arg2) [ Private ]
{
	Do $METHOD(instance, method, arg1, arg2)
}

Method OnCallback3(instance As %RegisteredObject, method As %String, arg1, arg2, arg3) [ Private ]
{
	Do $METHOD(instance, method, arg1, arg2, arg3)
}

Method OnCallback4(instance As %RegisteredObject, method As %String, arg1, arg2, arg3, arg4) [ Private ]
{
	Do $METHOD(instance, method, arg1, arg2, arg3, arg4)
}

Method OnCallback5(instance As %RegisteredObject, method As %String, arg1, arg2, arg3, arg4, arg5) [ Private ]
{
	Do $METHOD(instance, method, arg1, arg2, arg3, arg4, arg5)
}

Method InternalRun(delayMs As %Integer, wait As %Boolean, instance As %RegisteredObject, method As %String, args... As %List) [ Internal, Language = python ]
{
import time
import iris

if not self.Executor:
	raise Exception("The 'Initialize' method has not been called.")

def worker_function():
    time.sleep(delayMs / 1000)
    if len(args) == 0:
    	self.OnCallback0(instance, method)
    elif len(args) == 1:
        self.OnCallback1(instance, method, args[0])
    elif len(args) == 2:
        self.OnCallback2(instance, method, args[0], args[1])
    elif len(args) == 3:
        self.OnCallback3(instance, method, args[0], args[1], args[2])
    elif len(args) == 4:
        self.OnCallback4(instance, method, args[0], args[1], args[2], args[3])
    elif len(args) == 5:
        self.OnCallback5(instance, method, args[0], args[1], args[2], args[3], args[4])
    else:
        raise Exception("Too many arguments.")
    return 0
    
        
future = self.Executor.submit(worker_function)

# wait == 0 means fire-and-forget
try:
    if (wait == 1):
        rv = future.result()
    
except Exception as e:
    print(f"{e}")
}

/// delayMs  - the parameter specifies the timer delay in milliseconds
/// wait     - if the parameter is false, the process will not wait for the Future result to be returned (fire-and-forget)
/// instance - any object which method should be called with a delay
/// method   - specifies the object's callback method name
/// args     - the callback method arguments (up to 5)
Method Run(delayMs As %Integer, wait As %Boolean, instance As %RegisteredObject, method As %String, args... As %List)
{
	Do ..InternalRun(delayMs, wait, instance, method, args...)
}

ClassMethod Test()
{
	Set obj = ##class(Timer).%New()
	Do obj.Initialize()
	Do obj.Run(1000, 0, obj, "Greet", "John")
	Do obj.Run(2000, 0, obj, "Greet", "Jessica")
	Write "If 'wait == 0' this line will be printed first", !
	Do obj.Close()
}

}

0
Tani Frankel  Jul 21, 2025 to Dmitrii Baranov

Thanks for sharing back.

0