codereview: speed upload

Cuts time to upload trivial 160-file CL by 5x,
from 250 seconds to 50 seconds.

R=r
CC=golang-dev
https://golang.org/cl/1991047
diff --git a/lib/codereview/codereview.py b/lib/codereview/codereview.py
index d00b73b..fc6510f 100644
--- a/lib/codereview/codereview.py
+++ b/lib/codereview/codereview.py
@@ -6,7 +6,7 @@
 # you may not use this file except in compliance with the License.
 # You may obtain a copy of the License at
 #
-#     http://www.apache.org/licenses/LICENSE-2.0
+#	http://www.apache.org/licenses/LICENSE-2.0
 #
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
@@ -237,13 +237,11 @@
 			("subject", self.Subject()),
 		]
 
-		# NOTE(rsc): This duplicates too much of RealMain,
-		# but RealMain doesn't have the most reusable interface.
 		if self.name != "new":
 			form_fields.append(("issue", self.name))
 		vcs = None
 		if self.files:
-			vcs = MercurialVCS(upload_options, repo.root)
+			vcs = MercurialVCS(upload_options, ui, repo)
 			data = vcs.GenerateDiff(self.files)
 			files = vcs.GetBaseFiles(data)
 			if len(data) > MAX_UPLOAD_SIZE:
@@ -641,7 +639,7 @@
 	source = hg.parseurl(ui.expandpath("default"), None)[0]
 	try:
 		remoteui = hg.remoteui # hg 1.6
-        except:
+	except:
 		remoteui = cmdutil.remoteui
 	other = hg.repository(remoteui(repo, opts), source)
 	if proxy is not None:
@@ -712,7 +710,7 @@
 def ReplacementForCmdutilMatch(repo, pats=[], opts={}, globbed=False, default='relpath'):
 	taken = []
 	files = []
-        pats = pats or []
+	pats = pats or []
 	for p in pats:
 		if p.startswith('@'):
 			taken.append(p)
@@ -722,7 +720,7 @@
 			cl, err = LoadCL(repo.ui, repo, clname, web=False)
 			if err != '':
 				raise util.Abort("loading CL " + clname + ": " + err)
-			if cl.files == None:
+			if not cl.files:
 				raise util.Abort("no files in CL " + clname)
 			files = Add(files, cl.files)
 	pats = Sub(pats, taken) + ['path:'+f for f in files]
@@ -1543,19 +1541,18 @@
 	return cl, diffdata, ""
 
 def MySend(request_path, payload=None,
-           content_type="application/octet-stream",
-           timeout=None, force_auth=True,
-           **kwargs):
-     """Run MySend1 maybe twice, because Rietveld is unreliable."""
-     try:
-         return MySend1(request_path, payload, content_type, timeout, force_auth, **kwargs)
-     except Exception, e:
-         if type(e) == urllib2.HTTPError and e.code == 403:	# forbidden, it happens
-         	raise
-         print >>sys.stderr, "Loading "+request_path+": "+ExceptionDetail()+"; trying again in 2 seconds."
-     time.sleep(2)
-     return MySend1(request_path, payload, content_type, timeout, force_auth, **kwargs)
-
+		content_type="application/octet-stream",
+		timeout=None, force_auth=True,
+		**kwargs):
+	"""Run MySend1 maybe twice, because Rietveld is unreliable."""
+	try:
+		return MySend1(request_path, payload, content_type, timeout, force_auth, **kwargs)
+	except Exception, e:
+		if type(e) == urllib2.HTTPError and e.code == 403:	# forbidden, it happens
+			raise
+		print >>sys.stderr, "Loading "+request_path+": "+ExceptionDetail()+"; trying again in 2 seconds."
+		time.sleep(2)
+		return MySend1(request_path, payload, content_type, timeout, force_auth, **kwargs)
 
 # Like upload.py Send but only authenticates when the
 # redirect is to www.google.com/accounts.  This keeps
@@ -1799,7 +1796,7 @@
 # you may not use this file except in compliance with the License.
 # You may obtain a copy of the License at
 #
-#     http://www.apache.org/licenses/LICENSE-2.0
+#	http://www.apache.org/licenses/LICENSE-2.0
 #
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
@@ -2427,6 +2424,39 @@
 				StatusUpdate("  --> %s" % response_body)
 				sys.exit(1)
 
+		# Don't want to spawn too many threads, nor do we want to
+		# hit Rietveld too hard, or it will start serving 500 errors.
+		# When 8 works, it's no better than 4, and sometimes 8 is
+		# too many for Rietveld to handle.
+		MAX_PARALLEL_UPLOADS = 4
+
+		sema = threading.BoundedSemaphore(MAX_PARALLEL_UPLOADS)
+		upload_threads = []
+		finished_upload_threads = []
+		
+		class UploadFileThread(threading.Thread):
+			def __init__(self, args):
+				threading.Thread.__init__(self)
+				self.args = args
+			def run(self):
+				UploadFile(*self.args)
+				finished_upload_threads.append(self)
+				sema.release()
+
+		def StartUploadFile(*args):
+			sema.acquire()
+			while len(finished_upload_threads) > 0:
+				t = finished_upload_threads.pop()
+				upload_threads.remove(t)
+				t.join()
+			t = UploadFileThread(args)
+			upload_threads.append(t)
+			t.start()
+
+		def WaitForUploads():			
+			for t in upload_threads:
+				t.join()
+
 		patches = dict()
 		[patches.setdefault(v, k) for k, v in patch_list]
 		for filename in patches.keys():
@@ -2437,9 +2467,10 @@
 				file_id_str = file_id_str[file_id_str.rfind("_") + 1:]
 			file_id = int(file_id_str)
 			if base_content != None:
-				UploadFile(filename, file_id, base_content, is_binary, status, True)
+				StartUploadFile(filename, file_id, base_content, is_binary, status, True)
 			if new_content != None:
-				UploadFile(filename, file_id, new_content, is_binary, status, False)
+				StartUploadFile(filename, file_id, new_content, is_binary, status, False)
+		WaitForUploads()
 
 	def IsImage(self, filename):
 		"""Returns true if the filename has an image extension."""
@@ -2458,14 +2489,25 @@
 			return False
 		return not mimetype.startswith("text/")
 
+class FakeMercurialUI(object):
+	def __init__(self):
+		self.quiet = True
+		self.output = ''
+	
+	def write(self, s):
+		self.output += s
+
+use_hg_shell = False	# set to True to shell out to hg always; slower
 
 class MercurialVCS(VersionControlSystem):
 	"""Implementation of the VersionControlSystem interface for Mercurial."""
 
-	def __init__(self, options, repo_dir):
+	def __init__(self, options, ui, repo):
 		super(MercurialVCS, self).__init__(options)
+		self.ui = ui
+		self.repo = repo
 		# Absolute path to repository (we can be in a subdir)
-		self.repo_dir = os.path.normpath(repo_dir)
+		self.repo_dir = os.path.normpath(repo.root)
 		# Compute the subdir
 		cwd = os.path.normpath(os.getcwd())
 		assert cwd.startswith(self.repo_dir)
@@ -2532,7 +2574,14 @@
 		is_binary = False
 		oldrelpath = relpath = self._GetRelPath(filename)
 		# "hg status -C" returns two lines for moved/copied files, one otherwise
-		out = RunShell(["hg", "status", "-C", "--rev", self.base_rev, relpath])
+		if use_hg_shell:
+			out = RunShell(["hg", "status", "-C", "--rev", self.base_rev, relpath])
+		else:
+			fui = FakeMercurialUI()
+			ret = commands.status(fui, self.repo, *[relpath], **{'rev': [self.base_rev], 'copies': True})
+			if ret:
+				raise util.Abort(ret)
+			out = fui.output
 		out = out.splitlines()
 		# HACK: strip error message about missing file/directory if it isn't in
 		# the working copy
@@ -2547,13 +2596,15 @@
 		else:
 			base_rev = self.base_rev
 		if status != "A":
-			base_content = RunShell(["hg", "cat", "-r", base_rev, oldrelpath],
-				silent_ok=True)
+			if use_hg_shell:
+				base_content = RunShell(["hg", "cat", "-r", base_rev, oldrelpath], silent_ok=True)
+			else:
+				base_content = str(self.repo[base_rev][filename].data())
 			is_binary = "\0" in base_content  # Mercurial's heuristic
 		if status != "R":
 			new_content = open(relpath, "rb").read()
 			is_binary = is_binary or "\0" in new_content
-		if is_binary and base_content:
+		if is_binary and base_content and use_hg_shell:
 			# Fetch again without converting newlines
 			base_content = RunShell(["hg", "cat", "-r", base_rev, oldrelpath],
 				silent_ok=True, universal_newlines=False)