Waiting for cancellation
Each token has wait()
method, which allows you to wait for its cancellation.
from cantok import TimeoutToken
token = TimeoutToken(5)
token.wait() # It will take about 5 seconds.
token.check() # Since the timeout has expired, an exception will be raised.
# cantok.errors.TimeoutCancellationError: The timeout of 5 seconds has expired.
If you add the await
keyword before calling wait()
, the method will be automatically used in non-blocking mode:
import asyncio
from cantok import SimpleToken
async def do_something(token):
await asyncio.sleep(3) # Imitation of some real async activity.
token.cancel()
async def main():
token = SimpleToken()
await asyncio.gather(do_something(token), token.wait())
print('Something has been done!')
asyncio.run(main())
Yes, it looks like magic, it is magic. The method itself finds out how it was used: inside an expression with or without the await
keyword. In the first case, it runs in CPU-saving mode, in the second - in non-blocking event-loop mode.
In addition to the above, the wait()
method has two optional arguments:
timeout
(int
orfloat
) - the maximum waiting time in seconds. If this time is exceeded, aTimeoutCancellationError
exception will be raised. By default, thetimeout
is not set.step
(int
orfloat
, by default0.0001
) - the duration of the iteration, once in which the token state is polled, in seconds. For obvious reasons, you cannot set this value to a number that exceeds thetimeout
.