Source code for python_utils.time
from __future__ import absolute_import
import six
import time
import datetime
import itertools
# There might be a better way to get the epoch with tzinfo, please create
# a pull request if you know a better way that functions for Python 2 and 3
epoch = datetime.datetime(year=1970, month=1, day=1)
[docs]def timedelta_to_seconds(delta):
'''Convert a timedelta to seconds with the microseconds as fraction
Note that this method has become largely obsolete with the
`timedelta.total_seconds()` method introduced in Python 2.7.
>>> from datetime import timedelta
>>> '%d' % timedelta_to_seconds(timedelta(days=1))
'86400'
>>> '%d' % timedelta_to_seconds(timedelta(seconds=1))
'1'
>>> '%.6f' % timedelta_to_seconds(timedelta(seconds=1, microseconds=1))
'1.000001'
>>> '%.6f' % timedelta_to_seconds(timedelta(microseconds=1))
'0.000001'
'''
# Only convert to float if needed
if delta.microseconds:
total = delta.microseconds * 1e-6
else:
total = 0
total += delta.seconds
total += delta.days * 60 * 60 * 24
return total
[docs]def format_time(timestamp, precision=datetime.timedelta(seconds=1)):
'''Formats timedelta/datetime/seconds
>>> format_time('1')
'0:00:01'
>>> format_time(1.234)
'0:00:01'
>>> format_time(1)
'0:00:01'
>>> format_time(datetime.datetime(2000, 1, 2, 3, 4, 5, 6))
'2000-01-02 03:04:05'
>>> format_time(datetime.date(2000, 1, 2))
'2000-01-02'
>>> format_time(datetime.timedelta(seconds=3661))
'1:01:01'
>>> format_time(None)
'--:--:--'
>>> format_time(format_time) # doctest: +ELLIPSIS
Traceback (most recent call last):
...
TypeError: Unknown type ...
'''
precision_seconds = precision.total_seconds()
if isinstance(timestamp, six.string_types + six.integer_types + (float, )):
try:
castfunc = six.integer_types[-1]
timestamp = datetime.timedelta(seconds=castfunc(timestamp))
except OverflowError: # pragma: no cover
timestamp = None
if isinstance(timestamp, datetime.timedelta):
seconds = timestamp.total_seconds()
# Truncate the number to the given precision
seconds = seconds - (seconds % precision_seconds)
return str(datetime.timedelta(seconds=seconds))
elif isinstance(timestamp, datetime.datetime): # pragma: no cover
# Python 2 doesn't have the timestamp method
if hasattr(timestamp, 'timestamp'):
seconds = timestamp.timestamp()
else:
seconds = timedelta_to_seconds(timestamp - epoch)
# Truncate the number to the given precision
seconds = seconds - (seconds % precision_seconds)
try: # pragma: no cover
if six.PY3:
dt = datetime.datetime.fromtimestamp(seconds)
else:
dt = datetime.datetime.utcfromtimestamp(seconds)
except ValueError: # pragma: no cover
dt = datetime.datetime.max
return str(dt)
elif isinstance(timestamp, datetime.date):
return str(timestamp)
elif timestamp is None:
return '--:--:--'
else:
raise TypeError('Unknown type %s: %r' % (type(timestamp), timestamp))
[docs]def timeout_generator(
timeout,
interval=datetime.timedelta(seconds=1),
iterable=itertools.count,
interval_multiplier=1.0,
):
'''
Generator that walks through the given iterable (a counter by default)
until the timeout is reached with a configurable interval between items
>>> for i in timeout_generator(0.1, 0.06):
... print(i)
0
1
2
>>> timeout = datetime.timedelta(seconds=0.1)
>>> interval = datetime.timedelta(seconds=0.06)
>>> for i in timeout_generator(timeout, interval, itertools.count()):
... print(i)
0
1
2
>>> for i in timeout_generator(1, interval=0.1, iterable='ab'):
... print(i)
a
b
>>> timeout = datetime.timedelta(seconds=0.1)
>>> interval = datetime.timedelta(seconds=0.06)
>>> for i in timeout_generator(timeout, interval, interval_multiplier=2):
... print(i)
0
1
'''
if isinstance(interval, datetime.timedelta):
interval = timedelta_to_seconds(interval)
if isinstance(timeout, datetime.timedelta):
timeout = timedelta_to_seconds(timeout)
if callable(iterable):
iterable = iterable()
interval *= interval_multiplier
time.sleep(interval)
if six.PY3: # pragma: no cover
timer = time.perf_counter
else: # pragma: no cover
timer = time.time
end = timeout + timer()
for item in iterable:
yield item
if timer() >= end:
break
interval *= interval_multiplier
time.sleep(interval)