Waiting for cancellation
Each token has a wait() method, which allows you to wait for its cancellation.
from cantok import TimeoutToken
token = TimeoutToken(5)
token.wait() # It will take about 5 seconds.
token.check() # Since the timeout has expired, an exception will be raised.
#> ...
#> cantok.errors.TimeoutCancellationError: The timeout of 5 seconds has expired.
If you add the await keyword before calling wait(), the method will be automatically used in non-blocking mode:
import asyncio
from cantok import SimpleToken
async def do_something(token):
await asyncio.sleep(3) # Imitation of some real async activity.
token.cancel()
async def main():
token = SimpleToken()
await asyncio.gather(do_something(token), token.wait())
print('Something has been done!')
asyncio.run(main())
Yes, it looks like magic — it is magic. The method itself finds out how it was used: inside an expression with or without the await keyword. In the first case, it runs in CPU-saving mode, in the second — in non-blocking event-loop mode.
In addition to the above, the wait() method has two optional arguments:
timeout(intorfloat) — the maximum waiting time in seconds. If this time is exceeded, aTimeoutCancellationErrorexception will be raised. By default, thetimeoutis not set.step(intorfloat, by default0.0001) — the duration of each iteration during which the token state is polled, in seconds. For obvious reasons, you cannot set this value to a number that exceeds thetimeout.