blob: 3c13d254e439fd7c75d11c8ce67231c96f756c8f [file]
#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Handler for data copy operation.
Generic datastore admin console transfers control to ConfirmCopyHandler
after selection of entities. The ConfirmCopyHandler confirms with user
his choice, enters target application id and transfers control to
DoCopyHandler. DoCopyHandler starts copying mappers and displays confirmation
page.
This module also contains actual mapper code for copying data over.
"""
import logging
import urllib
from google.appengine.api import datastore
from google.appengine.api import capabilities
from google.appengine.datastore import datastore_rpc
from google.appengine.ext import blobstore
from google.appengine.ext import db
from google.appengine.ext import webapp
from google.appengine.ext.datastore_admin import remote_api_put_stub
from google.appengine.ext.datastore_admin import utils
from google.appengine.ext.mapreduce import context
from google.appengine.ext.mapreduce import operation
XSRF_ACTION = 'copy'
class ConfirmCopyHandler(webapp.RequestHandler):
"""Handler to deal with requests from the admin console to copy data."""
SUFFIX = 'confirm_copy'
@classmethod
def Render(cls, handler):
"""Rendering method that can be called by main.py.
Args:
handler: the webapp.RequestHandler invoking the method
"""
namespace = handler.request.get('namespace')
kinds = handler.request.get('kind', allow_multiple=True)
sizes_known, size_total, remainder = utils.ParseKindsAndSizes(kinds)
(namespace_str, kind_str) = utils.GetPrintableStrs(namespace, kinds)
notreadonly_warning = capabilities.CapabilitySet(
'datastore_v3', capabilities=['write']).is_enabled()
blob_warning = bool(blobstore.BlobInfo.all().fetch(1))
datastore_type = datastore._GetConnection().get_datastore_type()
high_replication_warning = (
datastore_type == datastore_rpc.Connection.HIGH_REPLICATION_DATASTORE)
template_params = {
'form_target': DoCopyHandler.SUFFIX,
'kind_list': kinds,
'remainder': remainder,
'sizes_known': sizes_known,
'size_total': size_total,
'app_id': handler.request.get('app_id'),
'cancel_url': handler.request.get('cancel_url'),
'kind_str': kind_str,
'namespace_str': namespace_str,
'xsrf_token': utils.CreateXsrfToken(XSRF_ACTION),
'notreadonly_warning': notreadonly_warning,
'blob_warning': blob_warning,
'high_replication_warning': high_replication_warning,
}
utils.RenderToResponse(handler, 'confirm_copy.html', template_params)
class DoCopyHandler(webapp.RequestHandler):
"""Handler to deal with requests from the admin console to copy data."""
SUFFIX = 'copy.do'
COPY_HANDLER = (
'google.appengine.ext.datastore_admin.copy_handler.CopyEntity.map')
INPUT_READER = (
'google.appengine.ext.mapreduce.input_readers.ConsistentKeyReader')
MAPREDUCE_DETAIL = utils.config.MAPREDUCE_PATH + '/detail?mapreduce_id='
def get(self):
"""Handler for get requests to datastore_admin/copy.do.
Status of executed jobs is displayed.
"""
jobs = self.request.get('job', allow_multiple=True)
error = self.request.get('error', '')
xsrf_error = self.request.get('xsrf_error', '')
template_params = {
'job_list': jobs,
'mapreduce_detail': self.MAPREDUCE_DETAIL,
'error': error,
'xsrf_error': xsrf_error,
}
utils.RenderToResponse(self, 'do_copy.html', template_params)
def post(self):
"""Handler for post requests to datastore_admin/copy.do.
Jobs are executed and user is redirected to the get handler.
"""
namespace = self.request.get('namespace')
kinds = self.request.get('kind', allow_multiple=True)
(namespace_str, kinds_str) = utils.GetPrintableStrs(namespace, kinds)
token = self.request.get('xsrf_token')
remote_url = self.request.get('remote_url')
extra_header = self.request.get('extra_header')
jobs = []
if not remote_url:
parameters = [('error', 'Unspecified remote URL.')]
elif not utils.ValidateXsrfToken(token, XSRF_ACTION):
parameters = [('xsrf_error', '1')]
else:
try:
if extra_header:
extra_headers = dict([extra_header.split(':', 1)])
else:
extra_headers = None
target_app = remote_api_put_stub.get_remote_appid(remote_url,
extra_headers)
op = utils.StartOperation(
'Copying %s%s to %s' % (kinds_str, namespace_str, target_app))
name_template = 'Copy all %(kind)s objects%(namespace)s'
mapper_params = {
'target_app': target_app,
'remote_url': remote_url,
'extra_header': extra_header,
}
jobs = utils.RunMapForKinds(
op,
kinds,
name_template,
self.COPY_HANDLER,
self.INPUT_READER,
mapper_params)
error = ''
except Exception, e:
logging.exception('Handling exception.')
error = self._HandleException(e)
parameters = [('job', job) for job in jobs]
if error:
parameters.append(('error', error))
query = urllib.urlencode(parameters)
self.redirect('%s/%s?%s' % (utils.config.BASE_PATH, self.SUFFIX, query))
def _HandleException(self, e):
"""Make exception handling overrideable by tests.
In normal cases, return only the error string; do not fail to render the
page for user.
"""
return str(e)
class AllocateMaxIdPool(object):
"""Mapper pool to keep track of all allocated ids.
Runs allocate_ids rpcs when flushed.
This code uses the knowloedge of allocate_id implementation detail.
Though we don't plan to change allocate_id logic, we don't really
want to depend on it either. We are using this details here to implement
batch-style remote allocate_ids.
"""
def __init__(self, app_id):
self.app_id = app_id
self.key_path_to_max_id = {}
def allocate_max_id(self, key):
"""Record the key to allocate max id.
Args:
key: Datastore key.
"""
path = key.to_path()
if not isinstance(path[-1], (int, long)):
return
if len(path) == 2:
max_id = path[-1]
path_tuple = (path[0], 1)
self.key_path_to_max_id[path_tuple] = max(
max_id, self.key_path_to_max_id.get(path_tuple, 0))
else:
max_id = 0
for path_element in path[2:]:
if isinstance(path_element, (int, long)):
max_id = max(max_id, path_element)
path_tuple = (path[0], path[1], 'Foo', 1)
self.key_path_to_max_id[path_tuple] = max(
max_id, self.key_path_to_max_id.get(path_tuple, 0))
def flush(self):
for path, maxid in self.key_path_to_max_id.items():
db.allocate_id_range(db.Key.from_path(_app=self.app_id, *list(path)),
1, maxid)
self.key_path_to_max_id = {}
class AllocateMaxId(operation.Operation):
"""Mapper operation to allocate max id."""
def __init__(self, key, app_id):
self.key = key
self.app_id = app_id
self.pool_id = 'allocate_max_id_%s_pool' % self.app_id
def __call__(self, ctx):
pool = ctx.get_pool(self.pool_id)
if not pool:
pool = AllocateMaxIdPool(self.app_id)
ctx.register_pool(self.pool_id, pool)
pool.allocate_max_id(self.key)
def FixKeys(entity_proto, app_id):
"""Go over keys in the given entity and update the application id.
Args:
entity_proto: An EntityProto to be fixed up. All identifiable keys in the
proto will have the 'app' field reset to match app_id.
app_id: The desired application id, typically os.getenv('APPLICATION_ID').
"""
def FixKey(mutable_key):
mutable_key.set_app(app_id)
def FixPropertyList(property_list):
for prop in property_list:
prop_value = prop.mutable_value()
if prop_value.has_referencevalue():
FixKey(prop_value.mutable_referencevalue())
FixKey(entity_proto.mutable_key())
FixPropertyList(entity_proto.property_list())
FixPropertyList(entity_proto.raw_property_list())
def KindPathFromKey(key):
"""Return kinds path as '/'-delimited string for a particular key."""
path = key.to_path()
kinds = []
is_kind = True
for item in path:
if is_kind:
kinds.append(item)
is_kind = not is_kind
kind_path = '/'.join(kinds)
return kind_path
def get_mapper_params():
"""Return current mapreduce mapper params. Easily stubbed out for testing."""
return context.get().mapreduce_spec.mapper.params
class CopyEntity(object):
"""A class which contains a map handler to copy entities remotely.
The class manages the connection.
"""
def __init__(self):
self.remote_api_stub_initialized = False
def setup_stub(self):
"""Set up the remote API stub."""
if self.remote_api_stub_initialized:
return
params = get_mapper_params()
if 'extra_header' in params and params['extra_header']:
extra_headers = dict([params['extra_header'].split(':', 1)])
else:
extra_headers = {}
remote_api_put_stub.configure_remote_put(params['remote_url'],
params['target_app'],
extra_headers)
self.remote_api_stub_initialized = True
def map(self, key):
"""Copy data map handler.
Args:
key: Datastore entity key to copy.
Yields:
A db operation to store the entity in the target app.
A counter operation incrementing the count for the entity kind.
"""
if not self.remote_api_stub_initialized:
self.setup_stub()
mapper_params = get_mapper_params()
target_app = mapper_params['target_app']
if isinstance(key, datastore.Entity):
entity = key
key = entity.key()
else:
entity = datastore.Get(key)
entity_proto = entity._ToPb()
FixKeys(entity_proto, target_app)
target_entity = datastore.Entity._FromPb(entity_proto)
yield operation.db.Put(target_entity)
yield AllocateMaxId(key, target_app)
yield operation.counters.Increment(KindPathFromKey(key))
def handlers_list(base_path):
return [
(r'%s/%s' % (base_path, ConfirmCopyHandler.SUFFIX), ConfirmCopyHandler),
(r'%s/%s' % (base_path, DoCopyHandler.SUFFIX), DoCopyHandler),
]