130 lines
3.2 KiB
Python
130 lines
3.2 KiB
Python
import contextlib
|
|
import inspect
|
|
import os
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
|
|
|
|
class UserError(Exception):
|
|
def __init__(self, message, *args):
|
|
super(UserError, self).__init__(message.format(*args))
|
|
|
|
|
|
def main(fn):
|
|
"""
|
|
Decorator for "main" functions. Decorates a function that should be
|
|
called when the containing module is run as a script (e.g. via python -m
|
|
<module>).
|
|
"""
|
|
frame = inspect.currentframe().f_back
|
|
|
|
def wrapped_fn(*args, **kwargs):
|
|
try:
|
|
fn(*args, **kwargs)
|
|
except UserError as e:
|
|
print >> sys.stderr, 'Error:', e
|
|
sys.exit(1)
|
|
except KeyboardInterrupt:
|
|
sys.exit(2)
|
|
|
|
if frame.f_globals['__name__'] == '__main__':
|
|
wrapped_fn(*sys.argv[1:])
|
|
|
|
# Allow the main function also to be called explicitly
|
|
return wrapped_fn
|
|
|
|
|
|
def rename_atomic(source_path, target_path):
|
|
"""
|
|
Move the file at source_path to target_path.
|
|
|
|
If both paths reside on the same device, os.rename() is used, otherwise
|
|
the file is copied to a temporary name next to target_path and moved from
|
|
there using os.rename().
|
|
"""
|
|
source_dir_stat = os.stat(os.path.dirname(source_path))
|
|
target_dir_stat = os.stat(os.path.dirname(target_path))
|
|
|
|
if source_dir_stat.st_dev == target_dir_stat.st_dev:
|
|
os.rename(source_path, target_path)
|
|
else:
|
|
temp_path = target_path + '~'
|
|
|
|
shutil.copyfile(source_path, temp_path)
|
|
os.rename(temp_path, target_path)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def TemporaryDirectory():
|
|
dir = tempfile.mkdtemp()
|
|
|
|
try:
|
|
yield dir
|
|
finally:
|
|
shutil.rmtree(dir)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def command_context(args, remove_env=[], set_env={}, working_dir=None, use_stderr=False):
|
|
env = dict(os.environ)
|
|
|
|
for i in remove_env:
|
|
del env[i]
|
|
|
|
for k, v in set_env.items():
|
|
env[k] = v
|
|
|
|
if use_stderr:
|
|
stderr = subprocess.PIPE
|
|
else:
|
|
stderr = None
|
|
|
|
try:
|
|
process = subprocess.Popen(args, env=env, cwd=working_dir, stderr=stderr)
|
|
except OSError as e:
|
|
raise UserError('Error running {}: {}', args[0], e)
|
|
|
|
try:
|
|
yield process
|
|
except:
|
|
try:
|
|
process.kill()
|
|
except OSError:
|
|
# Ignore exceptions here so we don't mask the
|
|
# already-being-thrown exception.
|
|
pass
|
|
|
|
raise
|
|
finally:
|
|
# Use communicate so that we won't deadlock if the process generates
|
|
# some unread output.
|
|
process.communicate()
|
|
|
|
if process.returncode:
|
|
raise UserError('Command failed: {}', ' '.join(args))
|
|
|
|
|
|
def command(args, remove_env=[], set_env={}, working_dir=None):
|
|
with command_context(args, remove_env, set_env, working_dir):
|
|
pass
|
|
|
|
|
|
def bash_escape_string(string):
|
|
return "'{}'".format(re.sub("'", "'\"'\"'", string))
|
|
|
|
|
|
def write_file(path, data):
|
|
temp_path = path + '~'
|
|
|
|
with open(temp_path, 'wb') as file:
|
|
file.write(data)
|
|
|
|
os.rename(temp_path, path)
|
|
|
|
|
|
def read_file(path):
|
|
with open(path, 'rb') as file:
|
|
return file.read()
|