Fire-and-forget async/background non-blocking tasks
I need to implement a retry policy for an incoming message queue containing thousands of relatively small messages.
Successfully processed messages should be immediately removed from the queue.
If an error occurs while processing a message, the message should be sent back at the end of the queue, and the pause before re-processing this message should increase geometrically (1-2-4-8-16 seconds, and so on). In languages that support the async/await pattern, I'd simply create a delayed timer that triggers a fire-and-forget task. This would prevent blocking the main thread. How can this be implemented in IRIS?
Comments
Hi Dimitrii,
There are various options here. You can use $job to start new process then continue on with your main process or you can use WorkQueueManager to create a workqueue and feed it with items to process.
Best Regards
Timo
Hi Timo,
Yesterday I experimented a lot with both Python and workers (and failed).
Here is an example. In this code I want the workers to be started as background tasks, so the program should first print "This should be printed first", then each worker (each started with random delay interval) shoud print its own message. This obviously doesn't happen because of the queue.Sync() call but I don't want to wait for all the workers to complete.
Class DelayedTest Extends %RegisteredObject
{
ClassMethod Callback(interval As %String) As %Status
{
Hang interval
Write "Interval = ", interval, !
Return $$$OK
}
Method RunWorkers()
{
#Dim queue as %SYSTEM.WorkMgr
Set queue = ##class(%SYSTEM.WorkMgr).%New()
For i = 1:1:5
{
Set status = queue.Queue("..Callback", $RANDOM(5) + 1) // Minimal delay is 1 second
$$$ThrowOnError(status)
}
Set status = queue.Sync()
$$$ThrowOnError(status)
}
ClassMethod Main()
{
#Dim d = ##class(DelayedTest).%New()
Do d.RunWorkers()
Write "This should be printed first"
}
}
Also, I'm not sure that Hang is appropriate here to emulate some delay. If it works like Sleep it should block the main thread.
Hi Dmitrii,
Why not Sync from main()?
Class User.DelayedTest Extends%RegisteredObject
{
ClassMethod Callback(interval As%String) As%Status
{
Hang interval
Write"Interval = ", interval, !
Return$$$OK
}
Method RunWorkers(queue)
{
#Dim queue as%SYSTEM.WorkMgrSet queue = ##class(%SYSTEM.WorkMgr).%New()
For i = 1:1:5
{
Set status = queue.Queue("..Callback", $RANDOM(5) + 1) // Minimal delay is 1 second$$$ThrowOnError(status)
}
}
ClassMethod Main()
{
#Dimd = ##class(DelayedTest).%New()
Dod.RunWorkers(.queue)
Write"This should be printed first",!
Set status = queue.Sync()
$$$ThrowOnError(status)
Write"Exiting...",!
}
}
Hi Alexey, that's the point! I expect the following sequence:
- the worker is created in background and waits for, say, 5 seconds. The main thread isn't blocked!
- the program prints "This should be printed first"
- the waiting interval of 5 seconds expires and the worker emits its own message
Hi Dmitrii,
From the discussion in the comments it seems like you're trying to use "Workers", but did you try to use the Event mechanism for your use-case?
See also this related article (and this one as well).
By the way here's a Docs reference of using this from Python.
I don't have a wider context of this, but in the IRIS Interoperability functionality, behind the scenes of the Interoperability components, and the Messages and Queues managed there, this Event mechanism is used. So perhaps if you are already using IRIS's Interoperability capabilities, you can implement this in the higher level of the Business Components in your Interoperability Production, rather than with the lower level code using the Event class.
The Workers mechanism you tried to use is intended more for distributing parallel work, and the Event API is more for messaging and queuing scenarios, which sounds more like your use-case.
This article might also be of interest to you as it discusses moving from "Workers" to "Events".
Hi Tani and thanks a lot for the info! To avoid struggling with interprocess communication, I experimented a little more and ended up with Workers and Python (see below). I'd appreciate your opinion
I finally managed to solve the problem in Python. It's not perfect but it works:
Class User.Timer Extends %RegisteredObject
{
Property Executor [ Private ];
Method Initialize(maxWorkers As %Integer = 4) [ Language = python ]
{
import concurrent.futures
import time
import threading
self.Executor = concurrent.futures.ThreadPoolExecutor(max_workers=maxWorkers)
}
Method Close() [ Language = python ]
{
if self.Executor:
self.Executor.shutdown()
}
Method Greet(name)
{
Write "Hello ", name, !
}
Method OnCallback0(methodName As %String) [ Private ]
{
Do $METHOD(instance, methodName)
}
Method OnCallback1(instance As %RegisteredObject, method As %String, arg1) [ Private ]
{
Do $METHOD(instance, method, arg1)
}
Method OnCallback2(instance As %RegisteredObject, method As %String, arg1, arg2) [ Private ]
{
Do $METHOD(instance, method, arg1, arg2)
}
Method OnCallback3(instance As %RegisteredObject, method As %String, arg1, arg2, arg3) [ Private ]
{
Do $METHOD(instance, method, arg1, arg2, arg3)
}
Method OnCallback4(instance As %RegisteredObject, method As %String, arg1, arg2, arg3, arg4) [ Private ]
{
Do $METHOD(instance, method, arg1, arg2, arg3, arg4)
}
Method OnCallback5(instance As %RegisteredObject, method As %String, arg1, arg2, arg3, arg4, arg5) [ Private ]
{
Do $METHOD(instance, method, arg1, arg2, arg3, arg4, arg5)
}
Method InternalRun(delayMs As %Integer, wait As %Boolean, instance As %RegisteredObject, method As %String, args... As %List) [ Internal, Language = python ]
{
import time
import iris
if not self.Executor:
raise Exception("The 'Initialize' method has not been called.")
def worker_function():
time.sleep(delayMs / 1000)
if len(args) == 0:
self.OnCallback0(instance, method)
elif len(args) == 1:
self.OnCallback1(instance, method, args[0])
elif len(args) == 2:
self.OnCallback2(instance, method, args[0], args[1])
elif len(args) == 3:
self.OnCallback3(instance, method, args[0], args[1], args[2])
elif len(args) == 4:
self.OnCallback4(instance, method, args[0], args[1], args[2], args[3])
elif len(args) == 5:
self.OnCallback5(instance, method, args[0], args[1], args[2], args[3], args[4])
else:
raise Exception("Too many arguments.")
return 0
future = self.Executor.submit(worker_function)
# wait == 0 means fire-and-forget
try:
if (wait == 1):
rv = future.result()
except Exception as e:
print(f"{e}")
}
/// delayMs - the parameter specifies the timer delay in milliseconds
/// wait - if the parameter is false, the process will not wait for the Future result to be returned (fire-and-forget)
/// instance - any object which method should be called with a delay
/// method - specifies the object's callback method name
/// args - the callback method arguments (up to 5)
Method Run(delayMs As %Integer, wait As %Boolean, instance As %RegisteredObject, method As %String, args... As %List)
{
Do ..InternalRun(delayMs, wait, instance, method, args...)
}
ClassMethod Test()
{
Set obj = ##class(Timer).%New()
Do obj.Initialize()
Do obj.Run(1000, 0, obj, "Greet", "John")
Do obj.Run(2000, 0, obj, "Greet", "Jessica")
Write "If 'wait == 0' this line will be printed first", !
Do obj.Close()
}
}
Thanks for sharing back.