Files
palemoon27/testing/web-platform/tests/tools/scripts/manifest.py
T

861 lines
25 KiB
Python
Executable File

#!/usr/bin/env python
import argparse
import json
import logging
import os
import re
import subprocess
import sys
import urlparse
from StringIO import StringIO
from abc import ABCMeta, abstractmethod, abstractproperty
from collections import defaultdict
from fnmatch import fnmatch
import _env
import html5lib
def get_git_func(repo_path):
def git(cmd, *args):
full_cmd = ["git", cmd] + list(args)
return subprocess.check_output(full_cmd, cwd=repo_path, stderr=subprocess.STDOUT)
return git
def is_git_repo(tests_root):
return os.path.exists(os.path.join(tests_root, ".git"))
_repo_root = None
def get_repo_root():
global _repo_root
if _repo_root is None:
git = get_git_func(os.path.dirname(__file__))
_repo_root = git("rev-parse", "--show-toplevel").rstrip()
return _repo_root
manifest_name = "MANIFEST.json"
ref_suffixes = ["_ref", "-ref"]
wd_pattern = "*.py"
blacklist = ["/", "/tools/", "/resources/", "/common/", "/conformance-checkers/", "_certs"]
logging.basicConfig()
logger = logging.getLogger("manifest")
logger.setLevel(logging.DEBUG)
def rel_path_to_url(rel_path, url_base="/"):
assert not os.path.isabs(rel_path)
if url_base[0] != "/":
url_base = "/" + url_base
if url_base[-1] != "/":
url_base += "/"
return url_base + rel_path.replace(os.sep, "/")
def url_to_rel_path(url, url_base):
url_path = urlparse.urlsplit(url).path
if not url_path.startswith(url_base):
raise ValueError, url
url_path = url_path[len(url_base):]
return url_path
def is_blacklisted(url):
for item in blacklist:
if item == "/":
if "/" not in url[1:]:
return True
elif url.startswith(item):
return True
return False
class ManifestItem(object):
__metaclass__ = ABCMeta
item_type = None
def __init__(self):
self.manifest = None
@abstractmethod
def key(self):
pass
def __eq__(self, other):
if not hasattr(other, "key"):
return False
return self.key() == other.key()
def __hash__(self):
return hash(self.key())
@abstractmethod
def to_json(self):
raise NotImplementedError
@classmethod
def from_json(self, manifest, obj):
raise NotImplementedError
@abstractproperty
def id(self):
pass
class URLManifestItem(ManifestItem):
def __init__(self, url, url_base="/"):
ManifestItem.__init__(self)
self.url = url
self.url_base = url_base
@property
def id(self):
return self.url
def key(self):
return self.item_type, self.url
@property
def path(self):
return url_to_rel_path(self.url, self.url_base)
def to_json(self):
rv = {"url": self.url}
return rv
@classmethod
def from_json(cls, manifest, obj):
return cls(obj["url"],
url_base=manifest.url_base)
class TestharnessTest(URLManifestItem):
item_type = "testharness"
def __init__(self, url, url_base="/", timeout=None):
URLManifestItem.__init__(self, url, url_base=url_base)
self.timeout = timeout
def to_json(self):
rv = {"url": self.url}
if self.timeout is not None:
rv["timeout"] = self.timeout
return rv
@classmethod
def from_json(cls, manifest, obj):
return cls(obj["url"],
url_base=manifest.url_base,
timeout=obj.get("timeout"))
class RefTest(URLManifestItem):
item_type = "reftest"
def __init__(self, url, ref_url, ref_type, url_base="/", timeout=None):
URLManifestItem.__init__(self, url, url_base=url_base)
if ref_type not in ["==", "!="]:
raise ValueError, "Unrecognised ref_type %s" % ref_type
self.ref_url = ref_url
self.ref_type = ref_type
self.timeout = timeout
@property
def id(self):
return (self.url, self.ref_type, self.ref_url)
def key(self):
return self.item_type, self.url, self.ref_type, self.ref_url
def to_json(self):
rv = {"url": self.url,
"ref_type": self.ref_type,
"ref_url": self.ref_url}
if self.timeout is not None:
rv["timeout"] = self.timeout
return rv
@classmethod
def from_json(cls, manifest, obj):
return cls(obj["url"],
obj["ref_url"],
obj["ref_type"],
url_base=manifest.url_base,
timeout=obj.get("timeout"))
class ManualTest(URLManifestItem):
item_type = "manual"
class Stub(URLManifestItem):
item_type = "stub"
class Helper(URLManifestItem):
item_type = "helper"
class WebdriverSpecTest(ManifestItem):
item_type = "wdspec"
def __init__(self, path):
ManifestItem.__init__(self)
self.path = path
@property
def id(self):
return self.path
def to_json(self):
return {"path": self.path}
def key(self):
return self.path
@classmethod
def from_json(cls, manifest, obj):
return cls(path=obj["path"])
class ManifestError(Exception):
pass
item_types = ["testharness", "reftest", "manual", "helper", "stub", "wdspec"]
class Manifest(object):
def __init__(self, git_rev=None, url_base="/"):
# Dict of item_type: {path: set(manifest_items)}
self._data = dict((item_type, defaultdict(set))
for item_type in item_types)
self.rev = git_rev
self.url_base = url_base
self.local_changes = LocalChanges(self)
def _included_items(self, include_types=None):
if include_types is None:
include_types = item_types
for item_type in include_types:
paths = self._data[item_type].copy()
for local_types, local_paths in self.local_changes.itertypes(item_type):
for path, items in local_paths.iteritems():
paths[path] = items
for path in self.local_changes.iterdeleted():
if path in paths:
del paths[path]
yield item_type, paths
def contains_path(self, path):
return any(path in paths for _, paths in self._included_items())
def add(self, item):
self._data[item.item_type][item.path].add(item)
item.manifest = self
def extend(self, items):
for item in items:
self.add(item)
def remove_path(self, path):
for item_type in item_types:
if path in self._data[item_type]:
del self._data[item_type][path]
def itertypes(self, *types):
if not types:
types = None
for item_type, items in self._included_items(types):
for item in sorted(items.items()):
yield item
def __iter__(self):
for item in self.itertypes():
yield item
def __getitem__(self, path):
for _, paths in self._included_items():
if path in paths:
return paths[path]
raise KeyError
def _committed_with_path(self, rel_path):
rv = set()
for paths_items in self._data.itervalues():
rv |= paths_items.get(rel_path, set())
return rv
def _committed_paths(self):
rv = set()
for paths_items in self._data.itervalues():
rv |= set(paths_items.keys())
return rv
def update(self,
tests_root,
url_base,
new_rev,
committed_changes=None,
local_changes=None,
remove_missing_local=False):
if local_changes is None:
local_changes = {}
if committed_changes is not None:
for rel_path, status in committed_changes:
self.remove_path(rel_path)
if status == "modified":
use_committed = rel_path in local_changes
manifest_items = get_manifest_items(tests_root,
rel_path,
url_base,
use_committed=use_committed)
self.extend(manifest_items)
self.local_changes = LocalChanges(self)
local_paths = set()
for rel_path, status in local_changes.iteritems():
local_paths.add(rel_path)
if status == "modified":
existing_items = self._committed_with_path(rel_path)
local_items = set(get_manifest_items(tests_root,
rel_path,
url_base,
use_committed=False))
updated_items = local_items - existing_items
self.local_changes.extend(updated_items)
else:
self.local_changes.add_deleted(path)
if remove_missing_local:
for path in self._committed_paths() - local_paths:
self.local_changes.add_deleted(path)
if new_rev is not None:
self.rev = new_rev
self.url_base = url_base
def to_json(self):
out_items = {
item_type: sorted(
test.to_json()
for _, tests in items.iteritems()
for test in tests
)
for item_type, items in self._data.iteritems()
}
rv = {"url_base": self.url_base,
"rev": self.rev,
"local_changes": self.local_changes.to_json(),
"items": out_items}
return rv
@classmethod
def from_json(cls, obj):
self = cls(git_rev=obj["rev"],
url_base=obj.get("url_base", "/"))
if not hasattr(obj, "iteritems"):
raise ManifestError
item_classes = {"testharness": TestharnessTest,
"reftest": RefTest,
"manual": ManualTest,
"helper": Helper,
"stub": Stub,
"wdspec": WebdriverSpecTest}
for k, values in obj["items"].iteritems():
if k not in item_types:
raise ManifestError
for v in values:
manifest_item = item_classes[k].from_json(self, v)
self.add(manifest_item)
self.local_changes = LocalChanges.from_json(self, obj["local_changes"])
return self
class LocalChanges(object):
def __init__(self, manifest):
self.manifest = manifest
self._data = dict((item_type, defaultdict(set)) for item_type in item_types)
self._deleted = set()
def add(self, item):
self._data[item.item_type][item.path].add(item)
item.manifest = self.manifest
def extend(self, items):
for item in items:
self.add(item)
def add_deleted(self, path):
self._deleted.add(path)
def is_deleted(self, path):
return path in self._deleted
def itertypes(self, *types):
for item_type in types:
yield item_type, self._data[item_type]
def iterdeleted(self):
for item in self._deleted:
yield item
def __getitem__(self, item_type):
return self._data[item_type]
def to_json(self):
rv = {"items": defaultdict(dict),
"deleted": []}
rv["deleted"].extend(self._deleted)
for test_type, paths in self._data.iteritems():
for path, tests in paths.iteritems():
rv["items"][test_type][path] = [test.to_json() for test in tests]
return rv
@classmethod
def from_json(cls, url_base, obj):
self = cls(url_base)
if not hasattr(obj, "iteritems"):
raise ManifestError
item_classes = {"testharness": TestharnessTest,
"reftest": RefTest,
"manual": ManualTest,
"helper": Helper,
"stub": Stub,
"wdspec": WebdriverSpecTest}
for test_type, paths in obj["items"].iteritems():
for path, tests in paths.iteritems():
for test in tests:
manifest_item = item_classes[test_type].from_json(self.manifest, test)
self.add(manifest_item)
for item in obj["deleted"]:
self.add_deleted(item)
return self
def get_ref(path):
base_path, filename = os.path.split(path)
name, ext = os.path.splitext(filename)
for suffix in ref_suffixes:
possible_ref = os.path.join(base_path, name + suffix + ext)
if os.path.exists(possible_ref):
return possible_ref
def markup_type(ext):
if not ext:
return None
if ext[0] == ".":
ext = ext[1:]
if ext in ["html", "htm"]:
return "html"
elif ext in ["xhtml", "xht"]:
return "xhtml"
elif ext == "svg":
return "svg"
return None
def get_timeout_meta(root):
return root.findall(".//{http://www.w3.org/1999/xhtml}meta[@name='timeout']")
def get_testharness_scripts(root):
return root.findall(".//{http://www.w3.org/1999/xhtml}script[@src='/resources/testharness.js']")
def get_reference_links(root):
match_links = root.findall(".//{http://www.w3.org/1999/xhtml}link[@rel='match']")
mismatch_links = root.findall(".//{http://www.w3.org/1999/xhtml}link[@rel='mismatch']")
return match_links, mismatch_links
def get_file(base_path, rel_path, use_committed):
if use_committed:
git = get_git_func(os.path.dirname(__file__))
blob = git("show", "HEAD:%s" % rel_path)
file_obj = ContextManagerStringIO(blob)
else:
path = os.path.join(base_path, rel_path)
file_obj = open(path)
return file_obj
class ContextManagerStringIO(StringIO):
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
self.close()
def get_manifest_items(tests_root, rel_path, url_base, use_committed=False):
if os.path.isdir(rel_path):
return []
url = rel_path_to_url(rel_path, url_base)
path = os.path.join(tests_root, rel_path)
if not use_committed and not os.path.exists(path):
return []
dir_path, filename = os.path.split(path)
name, ext = os.path.splitext(filename)
rel_dir_tree = rel_path.split(os.path.sep)
file_markup_type = markup_type(ext)
if filename.startswith("MANIFEST") or filename.startswith("."):
return []
if is_blacklisted(url):
return []
if name.startswith("stub-"):
return [Stub(url)]
if name.lower().endswith("-manual"):
return [ManualTest(url)]
if filename.endswith(".worker.js"):
return [TestharnessTest(url[:-3])]
ref_list = []
for suffix in ref_suffixes:
if name.endswith(suffix):
return [Helper(url)]
elif os.path.exists(os.path.join(dir_path, name + suffix + ext)):
ref_url, ref_ext = url.rsplit(".", 1)
ref_url = ref_url + suffix + ext
#Need to check if this is the right reftype
ref_list = [RefTest(url, ref_url, "==")]
# wdspec tests are in subdirectories of /webdriver excluding __init__.py
# files.
if (rel_dir_tree[0] == "webdriver" and
len(rel_dir_tree) > 2 and
filename != "__init__.py" and
fnmatch(filename, wd_pattern)):
return [WebdriverSpecTest(rel_path)]
if file_markup_type:
timeout = None
parser = {"html":lambda x:html5lib.parse(x, treebuilder="etree"),
"xhtml":ElementTree.parse,
"svg":ElementTree.parse}[file_markup_type]
with get_file(tests_root, rel_path, use_committed) as f:
try:
tree = parser(f)
except:
return [Helper(url)]
if hasattr(tree, "getroot"):
root = tree.getroot()
else:
root = tree
timeout_nodes = get_timeout_meta(root)
if timeout_nodes:
timeout_str = timeout_nodes[0].attrib.get("content", None)
if timeout_str and timeout_str.lower() == "long":
try:
timeout = timeout_str.lower()
except:
pass
if get_testharness_scripts(root):
return [TestharnessTest(url, timeout=timeout)]
else:
match_links, mismatch_links = get_reference_links(root)
for item in match_links + mismatch_links:
ref_url = urlparse.urljoin(url, item.attrib["href"])
ref_type = "==" if item.attrib["rel"] == "match" else "!="
reftest = RefTest(url, ref_url, ref_type, timeout=timeout)
if reftest not in ref_list:
ref_list.append(reftest)
return ref_list
return [Helper(url)]
def abs_path(path):
return os.path.abspath(path)
def chunks(data, n):
for i in range(0, len(data) - 1, n):
yield data[i:i+n]
class TestTree(object):
def __init__(self, tests_root, url_base):
self.tests_root = tests_root
self.url_base = url_base
def current_rev(self):
pass
def local_changes(self):
pass
def comitted_changes(self, base_rev=None):
pass
class GitTree(TestTree):
def __init__(self, tests_root, url_base):
TestTree.__init__(self, tests_root, url_base)
self.git = self.setup_git()
def setup_git(self):
assert is_git_repo(self.tests_root)
return get_git_func(self.tests_root)
def current_rev(self):
return self.git("rev-parse", "HEAD").strip()
def local_changes(self, path=None):
# -z is stable like --porcelain; see the git status documentation for details
cmd = ["status", "-z", "--ignore-submodules=all"]
if path is not None:
cmd.extend(["--", path])
rv = {}
data = self.git(*cmd)
if data == "":
return rv
assert data[-1] == "\0"
f = StringIO(data)
while f.tell() < len(data):
# First two bytes are the status in the stage (index) and working tree, respectively
staged = f.read(1)
worktree = f.read(1)
assert f.read(1) == " "
if staged == "R":
# When a file is renamed, there are two files, the source and the destination
files = 2
else:
files = 1
filenames = []
for i in range(files):
filenames.append("")
char = f.read(1)
while char != "\0":
filenames[-1] += char
char = f.read(1)
if not is_blacklisted(rel_path_to_url(filenames[0], self.url_base)):
rv.update(self.local_status(staged, worktree, filenames))
return rv
def committed_changes(self, base_rev=None):
if base_rev is None:
logger.debug("Adding all changesets to the manifest")
return [(item, "modified") for item in self.paths()]
logger.debug("Updating the manifest from %s to %s" % (base_rev, self.current_rev()))
rv = []
data = self.git("diff", "-z", "--name-status", base_rev + "..HEAD")
items = data.split("\0")
for status, filename in chunks(items, 2):
if is_blacklisted(rel_path_to_url(filename, self.url_base)):
continue
if status == "D":
rv.append((filename, "deleted"))
else:
rv.append((filename, "modified"))
return rv
def paths(self):
data = self.git("ls-tree", "--name-only", "--full-tree", "-r", "HEAD")
return [item for item in data.split("\n") if not item.endswith(os.path.sep)]
def local_status(self, staged, worktree, filenames):
# Convert the complex range of statuses that git can have to two values
# we care about; "modified" and "deleted" and return a dictionary mapping
# filenames to statuses
rv = {}
if (staged, worktree) in [("D", "D"), ("A", "U"), ("U", "D"), ("U", "A"),
("D", "U"), ("A", "A"), ("U", "U")]:
raise Exception("Can't operate on tree containing unmerged paths")
if staged == "R":
assert len(filenames) == 2
dest, src = filenames
rv[dest] = "modified"
rv[src] = "deleted"
else:
assert len(filenames) == 1
filename = filenames[0]
if staged == "D" or worktree == "D":
# Actually if something is deleted in the index but present in the worktree
# it will get included by having a status of both "D " and "??".
# It isn't clear whether that's a bug
rv[filename] = "deleted"
elif staged == "?" and worktree == "?":
# A new file. If it's a directory, recurse into it
if os.path.isdir(os.path.join(self.tests_root, filename)):
rv.update(self.local_changes(filename))
else:
rv[filename] = "modified"
else:
rv[filename] = "modified"
return rv
class NoVCSTree(TestTree):
"""Subclass that doesn't depend on git"""
ignore = ["*.py[c|0]", "*~", "#*"]
def current_rev(self):
return None
def local_changes(self):
# Put all files into local_changes and rely on Manifest.update to de-dupe
# changes that in fact comitted at the base rev.
rv = []
for dir_path, dir_names, filenames in os.walk(self.tests_root):
for filename in filenames:
if any(fnmatch(filename, pattern) for pattern in self.ignore):
continue
rel_path = os.path.relpath(os.path.join(dir_path, filename),
self.tests_root)
if is_blacklisted(rel_path_to_url(rel_path, self.url_base)):
continue
rv.append((rel_path, "modified"))
return dict(rv)
def committed_changes(self, base_rev=None):
return None
def load(manifest_path):
if os.path.exists(manifest_path):
logger.debug("Opening manifest at %s" % manifest_path)
else:
logger.debug("Creating new manifest at %s" % manifest_path)
try:
with open(manifest_path) as f:
manifest = Manifest.from_json(json.load(f))
except IOError as e:
manifest = Manifest(None)
return manifest
def update(tests_root, url_base, manifest, ignore_local=False):
global ElementTree
global html5lib
try:
from xml.etree import cElementTree as ElementTree
except ImportError:
from xml.etree import ElementTree
import html5lib
if is_git_repo(tests_root):
tests_tree = GitTree(tests_root, url_base)
remove_missing_local = False
else:
tests_tree = NoVCSTree(tests_root, url_base)
remove_missing_local = not ignore_local
if not ignore_local:
local_changes = tests_tree.local_changes()
else:
local_changes = None
manifest.update(tests_root,
url_base,
tests_tree.current_rev(),
tests_tree.committed_changes(manifest.rev),
local_changes,
remove_missing_local=remove_missing_local)
def write(manifest, manifest_path):
with open(manifest_path, "w") as f:
json.dump(manifest.to_json(), f, sort_keys=True, indent=2, separators=(',', ': '))
def update_from_cli(**kwargs):
tests_root = kwargs["tests_root"]
path = kwargs["path"]
assert tests_root is not None
if not kwargs.get("rebuild", False):
manifest = load(path)
else:
manifest = Manifest(None)
logger.info("Updating manifest")
update(tests_root,
kwargs["url_base"],
manifest,
ignore_local=kwargs.get("ignore_local", False))
write(manifest, path)
def abs_path(path):
return os.path.abspath(os.path.expanduser(path))
def create_parser():
parser = argparse.ArgumentParser()
parser.add_argument(
"-p", "--path", type=abs_path, help="Path to manifest file.")
parser.add_argument(
"--tests-root", type=abs_path, help="Path to root of tests.")
parser.add_argument(
"-r", "--rebuild", action="store_true", default=False,
help="Force a full rebuild of the manifest.")
parser.add_argument(
"--ignore-local", action="store_true", default=False,
help="Don't include uncommitted local changes in the manifest.")
parser.add_argument(
"--url-base", action="store", default="/",
help="Base url to use as the mount point for tests in this manifest.")
return parser
if __name__ == "__main__":
opts = create_parser().parse_args()
if opts.tests_root is None:
opts.tests_root = get_repo_root()
if opts.path is None:
opts.path = os.path.join(opts.tests_root, "MANIFEST.json")
update_from_cli(**vars(opts))