| Server IP : 199.250.200.62 / Your IP : 216.73.216.15 Web Server : Apache System : Linux vps37394.inmotionhosting.com 3.10.0-1160.119.1.vz7.224.4 #1 SMP Mon Sep 30 15:36:27 MSK 2024 x86_64 User : jasonp18 ( 1000) PHP Version : 7.4.33 Disable Function : exec,passthru,shell_exec,system MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /proc/3/root/proc/3/cwd/proc/3/root/opt/imh-python/lib/python3.9/site-packages/amqp/ |
Upload File : |
"""Compatibility utilities."""
import logging
from logging import NullHandler
# enables celery 3.1.23 to start again
from vine import promise # noqa
from vine.utils import wraps
try:
import fcntl
except ImportError: # pragma: no cover
fcntl = None # noqa
def set_cloexec(fd, cloexec):
"""Set flag to close fd after exec."""
if fcntl is None:
return
try:
FD_CLOEXEC = fcntl.FD_CLOEXEC
except AttributeError:
raise NotImplementedError(
'close-on-exec flag not supported on this platform',
)
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
if cloexec:
flags |= FD_CLOEXEC
else:
flags &= ~FD_CLOEXEC
return fcntl.fcntl(fd, fcntl.F_SETFD, flags)
def coro(gen):
"""Decorator to mark generator as a co-routine."""
@wraps(gen)
def _boot(*args, **kwargs):
co = gen(*args, **kwargs)
next(co)
return co
return _boot
def str_to_bytes(s):
"""Convert str to bytes."""
if isinstance(s, str):
return s.encode('utf-8', 'surrogatepass')
return s
def bytes_to_str(s):
"""Convert bytes to str."""
if isinstance(s, bytes):
return s.decode('utf-8', 'surrogatepass')
return s
def get_logger(logger):
"""Get logger by name."""
if isinstance(logger, str):
logger = logging.getLogger(logger)
if not logger.handlers:
logger.addHandler(NullHandler())
return logger