#!/usr/bin/env python
#
#  svnsync_authz_tests.py:  Tests SVNSync's repository mirroring
#                           capabilities that need to be run serially
#                           (mainly authz).
#
#  Subversion is a tool for revision control.
#  See https://subversion.apache.org for more information.
#
# ====================================================================
#    Licensed to the Apache Software Foundation (ASF) under one
#    or more contributor license agreements.  See the NOTICE file
#    distributed with this work for additional information
#    regarding copyright ownership.  The ASF licenses this file
#    to you under the Apache License, Version 2.0 (the
#    "License"); you may not use this file except in compliance
#    with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing,
#    software distributed under the License is distributed on an
#    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
#    KIND, either express or implied.  See the License for the
#    specific language governing permissions and limitations
#    under the License.
######################################################################

# General modules
import sys, os

# Test suite-specific modules
import locale, re

# Our testing module
import svntest
from svntest.verify import SVNUnexpectedStdout, SVNUnexpectedStderr
from svntest.verify import SVNExpectedStderr
from svntest.main import write_restrictive_svnserve_conf
from svntest.main import write_authz_file
from svntest.main import server_has_partial_replay

# Shared helpers
from svnsync_tests import run_init, run_sync, run_test

# (abbreviation)
Skip = svntest.testcase.Skip_deco
SkipUnless = svntest.testcase.SkipUnless_deco
XFail = svntest.testcase.XFail_deco
Issues = svntest.testcase.Issues_deco
Issue = svntest.testcase.Issue_deco
Wimp = svntest.testcase.Wimp_deco
Item = svntest.wc.StateItem

#----------------------------------------------------------------------
@Skip(svntest.main.is_ra_type_file)
def basic_authz(sbox):
  "verify that unreadable content is not synced"

  sbox.build(create_wc = False)

  write_restrictive_svnserve_conf(sbox.repo_dir)

  dest_sbox = sbox.clone_dependent()
  dest_sbox.build(create_wc=False, empty=True)

  svntest.actions.enable_revprop_changes(dest_sbox.repo_dir)

  run_init(dest_sbox.repo_url, sbox.repo_url)

  src_authz = sbox.authz_name()
  dst_authz = dest_sbox.authz_name()
  write_authz_file(sbox, None,
                   prefixed_rules = {
                       src_authz + ':/':    '* = r',
                       src_authz + ':/A/B': '* =',
                       dst_authz + ':/':    '* = rw',
                       })

  run_sync(dest_sbox.repo_url)

  lambda_url = dest_sbox.repo_url + '/A/B/lambda'
  iota_url = dest_sbox.repo_url + '/iota'

  # this file should have been blocked by authz
  svntest.actions.run_and_verify_svn([], svntest.verify.AnyOutput,
                                     'cat',
                                     lambda_url)
  # this file should have been synced
  svntest.actions.run_and_verify_svn(svntest.verify.AnyOutput, [],
                                     'cat',
                                     iota_url)

#----------------------------------------------------------------------
@Skip(svntest.main.is_ra_type_file)
def copy_from_unreadable_dir(sbox):
  "verify that copies from unreadable dirs work"

  sbox.build()

  B_url = sbox.repo_url + '/A/B'
  P_url = sbox.repo_url + '/A/P'

  # Set a property on the directory we're going to copy, and a file in it, to
  # confirm that they're transmitted when we later sync the copied directory
  svntest.actions.run_and_verify_svn(None,
                                     [],
                                     'pset',
                                     'foo',
                                     'bar',
                                     sbox.wc_dir + '/A/B/lambda')

  svntest.actions.run_and_verify_svn(None,
                                     [],
                                     'pset',
                                     'baz',
                                     'zot',
                                     sbox.wc_dir + '/A/B')

  svntest.actions.run_and_verify_svn(None,
                                     [],
                                     'ci',
                                     sbox.wc_dir + '/A/B',
                                     '-m', 'log_msg')

  # Now copy that directory so we'll see it in our synced copy
  svntest.actions.run_and_verify_svn(None,
                                     [],
                                     'cp',
                                     B_url,
                                     P_url,
                                     '-m', 'Copy B to P')

  write_restrictive_svnserve_conf(sbox.repo_dir)

  dest_sbox = sbox.clone_dependent()
  dest_sbox.build(create_wc=False, empty=True)

  svntest.actions.enable_revprop_changes(dest_sbox.repo_dir)

  src_authz = sbox.authz_name()
  dst_authz = dest_sbox.authz_name()
  write_authz_file(sbox, None,
                   prefixed_rules = {
                       src_authz + ':/':    '* = r',
                       src_authz + ':/A/B': '* =',
                       dst_authz + ':/':    '* = rw',
                       })

  run_init(dest_sbox.repo_url, sbox.repo_url)

  run_sync(dest_sbox.repo_url)

  expected_out = [
    'Changed paths:\n',
    '   A /A/P\n',
    '   A /A/P/E\n',
    '   A /A/P/E/alpha\n',
    '   A /A/P/E/beta\n',
    '   A /A/P/F\n',
    '   A /A/P/lambda\n',
    '\n',
    '\n', # log message is stripped
  ]

  exit_code, out, err = svntest.main.run_svn(None,
                                             'log',
                                             '-r', '3',
                                             '-v',
                                             dest_sbox.repo_url)

  if err:
    raise SVNUnexpectedStderr(err)

  svntest.verify.compare_and_display_lines(None,
                                           'LOG',
                                           expected_out,
                                           out[2:11])

  svntest.actions.run_and_verify_svn(['bar\n'],
                                     [],
                                     'pget',
                                     'foo',
                                     dest_sbox.repo_url + '/A/P/lambda')

  svntest.actions.run_and_verify_svn(['zot\n'],
                                     [],
                                     'pget',
                                     'baz',
                                     dest_sbox.repo_url + '/A/P')

# Issue 2705.
@Issue(2705)
@Skip(svntest.main.is_ra_type_file)
def copy_with_mod_from_unreadable_dir(sbox):
  "verify copies with mods from unreadable dirs"

  sbox.build()

  # Make a copy of the B directory.
  svntest.actions.run_and_verify_svn(None,
                                     [],
                                     'cp',
                                     sbox.wc_dir + '/A/B',
                                     sbox.wc_dir + '/A/P')

  # Set a property inside the copied directory.
  svntest.actions.run_and_verify_svn(None,
                                     [],
                                     'pset',
                                     'foo',
                                     'bar',
                                     sbox.wc_dir + '/A/P/lambda')

  # Add a new directory and file inside the copied directory.
  svntest.actions.run_and_verify_svn(None,
                                     [],
                                     'mkdir',
                                     sbox.wc_dir + '/A/P/NEW-DIR')

  svntest.main.file_append(sbox.wc_dir + '/A/P/E/new-file', "bla bla")
  svntest.main.run_svn(None, 'add', sbox.wc_dir + '/A/P/E/new-file')

  # Delete a file inside the copied directory.
  svntest.actions.run_and_verify_svn(None,
                                     [],
                                     'rm',
                                     sbox.wc_dir + '/A/P/E/beta')

  # Commit the copy-with-modification.
  svntest.actions.run_and_verify_svn(None,
                                     [],
                                     'ci',
                                     sbox.wc_dir,
                                     '-m', 'log_msg')

  # Lock down the source repository.
  write_restrictive_svnserve_conf(sbox.repo_dir)

  dest_sbox = sbox.clone_dependent()
  dest_sbox.build(create_wc=False, empty=True)

  svntest.actions.enable_revprop_changes(dest_sbox.repo_dir)

  src_authz = sbox.authz_name()
  dst_authz = dest_sbox.authz_name()
  write_authz_file(sbox, None,
                   prefixed_rules = {
                       src_authz + ':/':    '* = r',
                       src_authz + ':/A/B': '* =',
                       dst_authz + ':/':    '* = rw',
                       })

  run_init(dest_sbox.repo_url, sbox.repo_url)

  run_sync(dest_sbox.repo_url)

  expected_out = [
    'Changed paths:\n',
    '   A /A/P\n',
    '   A /A/P/E\n',
    '   A /A/P/E/alpha\n',
    '   A /A/P/E/new-file\n',
    '   A /A/P/F\n',
    '   A /A/P/NEW-DIR\n',
    '   A /A/P/lambda\n',
    '\n',
    '\n', # log message is stripped
  ]

  exit_code, out, err = svntest.main.run_svn(None,
                                             'log',
                                             '-r', '2',
                                             '-v',
                                             dest_sbox.repo_url)

  if err:
    raise SVNUnexpectedStderr(err)

  svntest.verify.compare_and_display_lines(None,
                                           'LOG',
                                           expected_out,
                                           out[2:12])

  svntest.actions.run_and_verify_svn(['bar\n'],
                                     [],
                                     'pget',
                                     'foo',
                                     dest_sbox.repo_url + '/A/P/lambda')

# Issue 2705.
@Issue(2705)
@Skip(svntest.main.is_ra_type_file)
def copy_with_mod_from_unreadable_dir_and_copy(sbox):
  "verify copies with mods from unreadable dirs +copy"

  sbox.build()

  # Make a copy of the B directory.
  svntest.actions.run_and_verify_svn(None,
                                     [],
                                     'cp',
                                     sbox.wc_dir + '/A/B',
                                     sbox.wc_dir + '/A/P')


  # Copy a (readable) file into the copied directory.
  svntest.actions.run_and_verify_svn(None,
                                     [],
                                     'cp',
                                     sbox.wc_dir + '/A/D/gamma',
                                     sbox.wc_dir + '/A/P/E')


  # Commit the copy-with-modification.
  svntest.actions.run_and_verify_svn(None,
                                     [],
                                     'ci',
                                     sbox.wc_dir,
                                     '-m', 'log_msg')

  # Lock down the source repository.
  write_restrictive_svnserve_conf(sbox.repo_dir)

  dest_sbox = sbox.clone_dependent()
  dest_sbox.build(create_wc=False, empty=True)

  svntest.actions.enable_revprop_changes(dest_sbox.repo_dir)

  src_authz = sbox.authz_name()
  dst_authz = dest_sbox.authz_name()
  write_authz_file(sbox, None,
                   prefixed_rules = {
                       src_authz + ':/':    '* = r',
                       src_authz + ':/A/B': '* =',
                       dst_authz + ':/':    '* = rw',
                       })

  run_init(dest_sbox.repo_url, sbox.repo_url)

  run_sync(dest_sbox.repo_url)

  expected_out = [
    'Changed paths:\n',
    '   A /A/P\n',
    '   A /A/P/E\n',
    '   A /A/P/E/alpha\n',
    '   A /A/P/E/beta\n',
    '   A /A/P/E/gamma (from /A/D/gamma:1)\n',
    '   A /A/P/F\n',
    '   A /A/P/lambda\n',
    '\n',
    '\n', # log message is stripped
  ]

  exit_code, out, err = svntest.main.run_svn(None,
                                             'log',
                                             '-r', '2',
                                             '-v',
                                             dest_sbox.repo_url)

  if err:
    raise SVNUnexpectedStderr(err)

  svntest.verify.compare_and_display_lines(None,
                                           'LOG',
                                           expected_out,
                                           out[2:12])

def identity_copy(sbox):
  "copy UTF-8 svn:* props identically"

  sbox.build(create_wc = False)

  orig_lc_all = locale.setlocale(locale.LC_ALL)
  other_locales = [ "English.1252", "German.1252", "French.1252",
                    "en_US.ISO-8859-1", "en_GB.ISO-8859-1", "de_DE.ISO-8859-1",
                    "en_US.ISO8859-1", "en_GB.ISO8859-1", "de_DE.ISO8859-1" ]
  for other_locale in other_locales:
    try:
      locale.setlocale(locale.LC_ALL, other_locale)
      break
    except:
      pass
  if locale.setlocale(locale.LC_ALL) != other_locale:
    raise svntest.Skip('Setting test locale failed')

  try:
    run_test(sbox, "copy-bad-encoding.expected.dump",
             exp_dump_file_name="copy-bad-encoding.expected.dump",
             bypass_prop_validation=True)
  finally:
    locale.setlocale(locale.LC_ALL, orig_lc_all)

@Skip(svntest.main.is_ra_type_file)
def specific_deny_authz(sbox):
  "verify if specifically denied paths dont sync"

  sbox.build()

  dest_sbox = sbox.clone_dependent()
  dest_sbox.build(create_wc=False, empty=True)

  svntest.actions.enable_revprop_changes(dest_sbox.repo_dir)

  run_init(dest_sbox.repo_url, sbox.repo_url)

  svntest.main.run_svn(None, "cp",
                       os.path.join(sbox.wc_dir, "A"),
                       os.path.join(sbox.wc_dir, "A_COPY")
                       )
  svntest.main.run_svn(None, "ci", "-mm", sbox.wc_dir)

  write_restrictive_svnserve_conf(sbox.repo_dir)

  # For mod_dav_svn's parent path setup we need per-repos permissions in
  # the authz file...
  if svntest.main.is_ra_type_dav():
    src_authz = sbox.authz_name()
    dst_authz = dest_sbox.authz_name()
    write_authz_file(sbox, None,
                   prefixed_rules = {
                       src_authz + ':/':                '* = r',
                       src_authz + ':/A':               '* =',
                       src_authz + ':/A_COPY/B/lambda': '* =',
                       dst_authz + ':/':                '* = rw',
                       })
  # Otherwise we can just go with the permissions needed for the source
  # repository.
  else:
    write_authz_file(sbox, None,
                   prefixed_rules = {
                       '/':                '* = r',
                       '/A':               '* =',
                       '/A_COPY/B/lambda': '* =',
                       })

  run_sync(dest_sbox.repo_url)

  lambda_url = dest_sbox.repo_url + '/A_COPY/B/lambda'

  # this file should have been blocked by authz
  svntest.actions.run_and_verify_svn([], svntest.verify.AnyOutput,
                                     'cat',
                                     lambda_url)

@Issue(4121)
@Skip(svntest.main.is_ra_type_file)
def copy_delete_unreadable_child(sbox):
  "copy, then rm at-src-unreadable child"

  # Prepare the source: Greek tree (r1), cp+rm (r2).
  sbox.build(create_wc = False)
  svntest.actions.run_and_verify_svnmucc(None, [],
                                         '-m', 'r2',
                                         '-U', sbox.repo_url,
                                         'cp', 'HEAD', '/', 'branch',
                                         'rm', 'branch/A')

  # Create the destination.
  dest_sbox = sbox.clone_dependent()
  dest_sbox.build(create_wc=False, empty=True)
  svntest.actions.enable_revprop_changes(dest_sbox.repo_dir)

  # Lock down the source.
  write_restrictive_svnserve_conf(sbox.repo_dir, anon_access='read')
  src_authz = sbox.authz_name()
  write_authz_file(sbox, None,
                   prefixed_rules = {
                       src_authz + ':/':  '* = r',
                       src_authz + ':/A': '* =',
                       })

  dest_url = dest_sbox.file_protocol_repo_url()
  run_init(dest_url, sbox.repo_url)
  run_sync(dest_url)

  # sanity check
  svntest.actions.run_and_verify_svn(["iota\n"], [],
                                     'ls', dest_url+'/branch@2')


########################################################################
# Run the tests


# list all tests here, starting with None:
test_list = [ None,
              basic_authz,
              copy_from_unreadable_dir,
              copy_with_mod_from_unreadable_dir,
              copy_with_mod_from_unreadable_dir_and_copy,
              identity_copy,
              specific_deny_authz,
              copy_delete_unreadable_child,
             ]
serial_only = True

if __name__ == '__main__':
  svntest.main.run_tests(test_list, serial_only = serial_only)
  # NOTREACHED


### End of file.
