Logo Search packages:      
Sourcecode: ubuntuone-control-panel version File versions  Download package

test_backend.py

# -*- coding: utf-8 -*-

# Authors: Alejandro J. Cura <alecu@canonical.com>
# Authors: Natalia B. Bidart <nataliabidart@canonical.com>
#
# Copyright 2010 Canonical Ltd.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 3, as published
# by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranties of
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
# PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.

"""Tests for the control panel backend."""

from collections import defaultdict

import simplejson

from twisted.internet import defer
from twisted.internet.defer import inlineCallbacks, returnValue
from ubuntuone.devtools.handlers import MementoHandler

from ubuntuone.controlpanel import backend, replication_client
from ubuntuone.controlpanel.backend import (bool_str,
    ACCOUNT_API, DEVICES_API, DEVICE_REMOVE_API, QUOTA_API,
    DEVICE_TYPE_COMPUTER,
    FILE_SYNC_DISABLED,
    FILE_SYNC_DISCONNECTED,
    FILE_SYNC_ERROR,
    FILE_SYNC_IDLE,
    FILE_SYNC_STARTING,
    FILE_SYNC_STOPPED,
    FILE_SYNC_SYNCING,
    FILE_SYNC_UNKNOWN,
    MSG_KEY, STATUS_KEY,
)
from ubuntuone.controlpanel.tests import (TestCase,
    EMPTY_DESCRIPTION_JSON,
    EXPECTED_ACCOUNT_INFO,
    EXPECTED_ACCOUNT_INFO_WITH_CURRENT_PLAN,
    EXPECTED_DEVICES_INFO,
    LOCAL_DEVICE,
    ROOT_PATH,
    SAMPLE_ACCOUNT_NO_CURRENT_PLAN,
    SAMPLE_ACCOUNT_WITH_CURRENT_PLAN,
    SAMPLE_DEVICES_JSON,
    SAMPLE_FOLDERS,
    SAMPLE_QUOTA_JSON,
    SAMPLE_SHARED,
    SAMPLE_SHARES,
    SHARES_PATH,
    SHARES_PATH_LINK,
    TOKEN,
)


00064 class MockWebClient(object):
    """A mock webclient."""
    failure = False
    results = {}

00069     def __init__(self, get_credentials):
        """Initialize this mock instance."""
        self.get_credentials = get_credentials

00073     def call_api(self, method):
        """Get a given url from the webservice."""
        if self.failure == 401:
            return defer.fail(backend.UnauthorizedError(self.failure))
        elif self.failure:
            return defer.fail(backend.WebClientError(self.failure))
        else:
            result = simplejson.loads(self.results[method])
            return defer.succeed(result)


00084 class MockDBusClient(object):
    """A mock dbus_client module."""

    creds = TOKEN
    throttling = False
    show_all_notifications = True
    limits = {"download": -1, "upload": -1}
    file_sync = True
    status = {
        'name': 'TEST', 'queues': 'GORGEOUS', 'connection': '',
        'description': 'Some test state, nothing else.',
        'is_error': '', 'is_connected': 'True', 'is_online': '',
    }
    status_changed_handler = None
    subscribed_folders = []
    subscribed_shares = []
    actions = []
    shares = []
    folders = []

00104     def get_credentials(self):
        """Return the mock credentials."""
        return defer.succeed(self.creds)

00108     def clear_credentials(self):
        """Clear the mock credentials."""
        MockDBusClient.creds = None
        return defer.succeed(None)

00113     def get_throttling_limits(self):
        """Return the sample speed limits."""
        return self.limits

00117     def set_throttling_limits(self, limits):
        """Set the sample speed limits."""
        self.limits["download"] = int(limits["download"])
        self.limits["upload"] = int(limits["upload"])

00122     def bandwidth_throttling_enabled(self):
        """Return the state of throttling."""
        return self.throttling

00126     def enable_bandwidth_throttling(self):
        """Enable bw throttling."""
        self.throttling = True

00130     def disable_bandwidth_throttling(self):
        """Disable bw throttling."""
        self.throttling = False

00134     def show_all_notifications_enabled(self):
        """Return the state of show_all_notifications."""
        return self.show_all_notifications

00138     def enable_show_all_notifications(self):
        """Enable show_all_notifications."""
        self.show_all_notifications = True

00142     def disable_show_all_notifications(self):
        """Disable show_all_notifications."""
        self.show_all_notifications = False

00146     def files_sync_enabled(self):
        """Get if file sync service is enabled."""
        return MockDBusClient.file_sync

00150     def set_files_sync_enabled(self, enabled):
        """Set the file sync service to be 'enabled'."""
        MockDBusClient.file_sync = enabled

00154     def connect_file_sync(self):
        """Connect files service."""
        MockDBusClient.actions.append('connect')

00158     def disconnect_file_sync(self):
        """Disconnect file_sync service."""
        MockDBusClient.actions.append('disconnect')

00162     def start_file_sync(self):
        """Start the file_sync service."""
        MockDBusClient.actions.append('start')

00166     def stop_file_sync(self):
        """Stop the file_sync service."""
        MockDBusClient.actions.append('stop')

00170     def get_root_dir(self):
        """Grab the root dir."""
        return ROOT_PATH

00174     def get_shares_dir(self):
        """Grab the shares dir."""
        return SHARES_PATH

00178     def get_shares_dir_link(self):
        """Grab the shares dir."""
        return SHARES_PATH_LINK

00182     def get_folders(self):
        """Grab list of folders."""
        return MockDBusClient.folders

00186     def subscribe_folder(self, volume_id):
        """Subcribe to 'volume_id'."""
        MockDBusClient.subscribed_folders.append(volume_id)

00190     def unsubscribe_folder(self, volume_id):
        """Unsubcribe from 'volume_id'."""
        MockDBusClient.subscribed_folders.remove(volume_id)

00194     def get_shares(self):
        """Grab list of shares."""
        return MockDBusClient.shares

00198     def subscribe_share(self, volume_id):
        """Subcribe to 'volume_id'."""
        MockDBusClient.subscribed_shares.append(volume_id)

00202     def unsubscribe_share(self, volume_id):
        """Unsubcribe from 'volume_id'."""
        MockDBusClient.subscribed_shares.remove(volume_id)

00206     def get_current_status(self):
        """Grab syncdaemon status."""
        return self.status

00210     def set_status_changed_handler(self, handler):
        """Connect a handler for tracking syncdaemon status changes."""
        self.status_changed_handler = handler

00214     def get_shared(self):
        """Grab list of shared (shares from the user to others)."""
        return SAMPLE_SHARED


00219 class MockReplicationClient(object):
    """A mock replication_client module."""

    BOOKMARKS = 'awesome'
    CONTACTS = 'legendary'

    replications = set([BOOKMARKS, CONTACTS, 'other'])
    exclusions = set([CONTACTS])

00228     def get_replications(self):
        """Grab the list of replications in this machine."""
        return MockReplicationClient.replications

00232     def get_exclusions(self):
        """Grab the list of exclusions in this machine."""
        return MockReplicationClient.exclusions

00236     def replicate(self, replication_id):
        """Remove replication_id from the exclusions list."""
        if replication_id not in MockReplicationClient.replications:
            raise replication_client.ReplicationError(replication_id)
        MockReplicationClient.exclusions.remove(replication_id)

00242     def exclude(self, replication_id):
        """Add replication_id to the exclusions list."""
        if replication_id not in MockReplicationClient.replications:
            raise replication_client.ReplicationError(replication_id)
        MockReplicationClient.exclusions.add(replication_id)


00249 class BackendBasicTestCase(TestCase):
    """Simple tests for the backend."""

    timeout = 3

    def setUp(self):
        super(BackendBasicTestCase, self).setUp()
        self.patch(backend, "WebClient", MockWebClient)
        self.patch(backend, "dbus_client", MockDBusClient())
        self.patch(backend, "replication_client", MockReplicationClient())
        self.local_token = DEVICE_TYPE_COMPUTER + TOKEN["token"]
        self.be = backend.ControlBackend()

        self.memento = MementoHandler()
        backend.logger.addHandler(self.memento)

        MockDBusClient.creds = TOKEN

00267     def test_backend_creation(self):
        """The backend instance is successfully created."""
        self.assertEqual(self.be.wc.__class__, MockWebClient)

    @inlineCallbacks
00272     def test_get_token(self):
        """The get_token method returns the right token."""
        token = yield self.be.get_token()
        self.assertEqual(token, TOKEN["token"])

    @inlineCallbacks
00278     def test_device_is_local(self):
        """The device_is_local returns the right result."""
        result = yield self.be.device_is_local(self.local_token)
        self.assertTrue(result)

        did = EXPECTED_DEVICES_INFO[-1]['device_id']
        result = yield self.be.device_is_local(did)
        self.assertFalse(result)

00287     def test_shutdown_func(self):
        """A shutdown_func can be passed as creation parameter."""
        f = lambda: None
        be = backend.ControlBackend(shutdown_func=f)
        self.assertEqual(be.shutdown_func, f)

00293     def test_shutdown_func_is_called_on_shutdown(self):
        """The shutdown_func is called on shutdown."""
        self.be.shutdown_func = self._set_called
        self.be.shutdown()
        self.assertEqual(self._called, ((), {}))

00299     def test_shutdown_func_when_none(self):
        """The shutdown_func can be None."""
        self.be.shutdown_func = None
        self.be.shutdown()
        # nothing explodes


00306 class BackendAccountTestCase(BackendBasicTestCase):
    """Account tests for the backend."""

    @inlineCallbacks
00310     def test_account_info(self):
        """The account_info method exercises its callback."""
        # pylint: disable=E1101
        self.be.wc.results[ACCOUNT_API] = SAMPLE_ACCOUNT_NO_CURRENT_PLAN
        self.be.wc.results[QUOTA_API] = SAMPLE_QUOTA_JSON
        result = yield self.be.account_info()
        self.assertEqual(result, EXPECTED_ACCOUNT_INFO)

    @inlineCallbacks
00319     def test_account_info_with_current_plan(self):
        """The account_info method exercises its callback."""
        # pylint: disable=E1101
        self.be.wc.results[ACCOUNT_API] = SAMPLE_ACCOUNT_WITH_CURRENT_PLAN
        self.be.wc.results[QUOTA_API] = SAMPLE_QUOTA_JSON
        result = yield self.be.account_info()
        self.assertEqual(result, EXPECTED_ACCOUNT_INFO_WITH_CURRENT_PLAN)

    @inlineCallbacks
00328     def test_account_info_fails(self):
        """The account_info method exercises its errback."""
        # pylint: disable=E1101
        self.be.wc.failure = 404
        yield self.assertFailure(self.be.account_info(),
                                 backend.WebClientError)

    @inlineCallbacks
00336     def test_account_info_fails_with_unauthorized(self):
        """The account_info clears the credentials on unauthorized."""
        # pylint: disable=E1101
        self.be.wc.failure = 401
        d = defer.Deferred()
        self.patch(backend.dbus_client, 'clear_credentials',
                   lambda: d.callback('called'))
        yield self.assertFailure(self.be.account_info(),
                                 backend.UnauthorizedError)
        yield d


00348 class BackendDevicesTestCase(BackendBasicTestCase):
    """Devices tests for the backend."""

    @inlineCallbacks
00352     def test_devices_info(self):
        """The devices_info method exercises its callback."""
        # pylint: disable=E1101
        self.be.wc.results[DEVICES_API] = SAMPLE_DEVICES_JSON
        result = yield self.be.devices_info()
        self.assertEqual(result, EXPECTED_DEVICES_INFO)

    @inlineCallbacks
00360     def test_devices_info_fails(self):
        """The devices_info method exercises its errback."""
        def fail(*args, **kwargs):
            """Raise any error other than WebClientError."""
            raise ValueError(args)

        self.patch(self.be.wc, 'call_api', fail)
        yield self.assertFailure(self.be.devices_info(), ValueError)

    @inlineCallbacks
00370     def test_devices_info_with_webclient_error(self):
        """The devices_info returns local info if webclient error."""
        # pylint: disable=E1101
        self.be.wc.failure = 404
        result = yield self.be.devices_info()

        self.assertEqual(result, [LOCAL_DEVICE])
        self.assertTrue(self.memento.check_error('devices_info',
                                                 'web client failure'))

    @inlineCallbacks
00381     def test_devices_info_fails_with_unauthorized(self):
        """The devices_info clears the credentials on unauthorized."""
        # pylint: disable=E1101
        self.be.wc.failure = 401
        d = defer.Deferred()
        self.patch(backend.dbus_client, 'clear_credentials',
                   lambda: d.callback('called'))
        yield self.assertFailure(self.be.devices_info(),
                                 backend.UnauthorizedError)
        yield d

    @inlineCallbacks
00393     def test_devices_info_if_files_disable(self):
        """The devices_info returns device only info if files is disabled."""
        yield self.be.disable_files()
        status = yield self.be.file_sync_status()
        assert status['status'] == backend.FILE_SYNC_DISABLED, status

        # pylint: disable=E1101
        self.be.wc.results[DEVICES_API] = SAMPLE_DEVICES_JSON
        result = yield self.be.devices_info()

        expected = EXPECTED_DEVICES_INFO[:]
        for device in expected:
            device.pop('limit_bandwidth', None)
            device.pop('max_download_speed', None)
            device.pop('max_upload_speed', None)
            device.pop('show_all_notifications', None)
            device['configurable'] = ''
        self.assertEqual(result, expected)

    @inlineCallbacks
00413     def test_devices_info_when_token_name_is_empty(self):
        """The devices_info can handle empty token names."""
        # pylint: disable=E1101
        self.be.wc.results[DEVICES_API] = EMPTY_DESCRIPTION_JSON
        result = yield self.be.devices_info()
        expected = {'configurable': '',
                    'device_id': 'ComputerABCDEF01234token',
                    'is_local': '', 'name': 'None',
                    'type': DEVICE_TYPE_COMPUTER}
        self.assertEqual(result, [expected])
        self.assertTrue(self.memento.check_warning('name', 'None'))

    @inlineCallbacks
00426     def test_devices_info_does_not_log_device_id(self):
        """The devices_info does not log the device_id."""
        # pylint: disable=E1101
        self.be.wc.results[DEVICES_API] = SAMPLE_DEVICES_JSON
        yield self.be.devices_info()

        dids = (d['device_id'] for d in EXPECTED_DEVICES_INFO)
        device_id_logged = all(all(did not in r.getMessage()
                                   for r in self.memento.records)
                               for did in dids)
        self.assertTrue(device_id_logged)

    @inlineCallbacks
00439     def test_remove_device(self):
        """The remove_device method calls the right api."""
        dtype, did = DEVICE_TYPE_COMPUTER, "SAMPLE-TOKEN"
        device_id = dtype + did
        apiurl = DEVICE_REMOVE_API % (dtype.lower(), did)
        # pylint: disable=E1101
        self.be.wc.results[apiurl] = SAMPLE_DEVICES_JSON
        result = yield self.be.remove_device(device_id)
        self.assertEqual(result, device_id)
        # credentials were not cleared
        self.assertEqual(MockDBusClient.creds, TOKEN)

    @inlineCallbacks
00452     def test_remove_device_clear_credentials_if_local_device(self):
        """The remove_device method clears the credentials if is local."""
        apiurl = DEVICE_REMOVE_API % ('computer', TOKEN['token'])
        # pylint: disable=E1101
        self.be.wc.results[apiurl] = SAMPLE_DEVICES_JSON
        yield self.be.remove_device(self.local_token)
        # credentials were cleared
        self.assertEqual(MockDBusClient.creds, None)

    @inlineCallbacks
00462     def test_remove_device_fails(self):
        """The remove_device method fails as expected."""
        # pylint: disable=E1101
        self.be.wc.failure = 404
        yield self.assertFailure(self.be.remove_device(self.local_token),
                                 backend.WebClientError)

    @inlineCallbacks
00470     def test_remove_device_fails_with_unauthorized(self):
        """The remove_device clears the credentials on unauthorized."""
        # pylint: disable=E1101
        self.be.wc.failure = 401
        d = defer.Deferred()
        self.patch(backend.dbus_client, 'clear_credentials',
                   lambda: d.callback('called'))
        yield self.assertFailure(self.be.remove_device(self.local_token),
                                 backend.UnauthorizedError)
        yield d

    @inlineCallbacks
00482     def test_remove_device_does_not_log_device_id(self):
        """The remove_device does not log the device_id."""
        device_id = DEVICE_TYPE_COMPUTER + TOKEN['token']
        yield self.be.remove_device(device_id)

        device_id_logged = all(device_id not in r.getMessage()
                               for r in self.memento.records)
        self.assertTrue(device_id_logged)

    @inlineCallbacks
00492     def test_change_show_all_notifications(self):
        """The device settings are updated."""
        backend.dbus_client.show_all_notifications = False
        yield self.be.change_device_settings(self.local_token,
                                        {"show_all_notifications": 'True'})
        self.assertEqual(backend.dbus_client.show_all_notifications, True)
        yield self.be.change_device_settings(self.local_token,
                                        {"show_all_notifications": ''})
        self.assertEqual(backend.dbus_client.show_all_notifications, False)

    @inlineCallbacks
00503     def test_change_limit_bandwidth(self):
        """The device settings are updated."""
        backend.dbus_client.throttling = False
        yield self.be.change_device_settings(self.local_token,
                                        {"limit_bandwidth": 'True'})
        self.assertEqual(backend.dbus_client.throttling, True)
        yield self.be.change_device_settings(self.local_token,
                                        {"limit_bandwidth": ''})
        self.assertEqual(backend.dbus_client.throttling, False)

    @inlineCallbacks
00514     def test_change_upload_speed_limit(self):
        """The device settings are updated."""
        backend.dbus_client.limits = {"download": -1, "upload": -1}
        yield self.be.change_device_settings(self.local_token,
                                        {"max_upload_speed": "1111"})
        self.assertEqual(backend.dbus_client.limits["upload"], 1111)
        self.assertEqual(backend.dbus_client.limits["download"], -1)

    @inlineCallbacks
00523     def test_change_download_speed_limit(self):
        """The device settings are updated."""
        backend.dbus_client.limits = {"download": -1, "upload": -1}
        yield self.be.change_device_settings(self.local_token,
                                        {"max_download_speed": "99"})
        self.assertEqual(backend.dbus_client.limits["upload"], -1)
        self.assertEqual(backend.dbus_client.limits["download"], 99)

    @inlineCallbacks
00532     def test_changing_settings_for_wrong_id_has_no_effect(self):
        """If the id is wrong, no settings are changed."""
        backend.dbus_client.throttling = False
        backend.dbus_client.limits = {"download": -1, "upload": -1}
        new_settings = {
            "max_download_speed": "99",
            "max_upload_speed": "99",
            "limit_bandwidth": 'True',
        }
        yield self.be.change_device_settings("wrong token!", new_settings)
        self.assertEqual(backend.dbus_client.throttling, False)
        self.assertEqual(backend.dbus_client.limits["upload"], -1)
        self.assertEqual(backend.dbus_client.limits["download"], -1)

    @inlineCallbacks
00547     def test_changing_settings_does_not_log_device_id(self):
        """The change_device_settings does not log the device_id."""
        device_id = 'yadda-yadda'
        yield self.be.change_device_settings(device_id, {})

        device_id_logged = all(device_id not in r.getMessage()
                               for r in self.memento.records)
        self.assertTrue(device_id_logged)


00557 class BackendVolumesTestCase(BackendBasicTestCase):
    """Volumes tests for the backend."""

    def setUp(self):
        super(BackendVolumesTestCase, self).setUp()
        # fake quota info and calculate free bytes
        # pylint: disable=E1101
        self.be.wc.results[ACCOUNT_API] = SAMPLE_ACCOUNT_NO_CURRENT_PLAN
        self.be.wc.results[QUOTA_API] = SAMPLE_QUOTA_JSON

    @inlineCallbacks
00568     def expected_volumes(self, sample_shares, sample_folders):
        """Get shares and group by sharing user, get folders and free space."""
        try:
            result = yield self.be.account_info()
        except Exception:  # pylint: disable=W0703
            free_bytes = self.be.FREE_BYTES_NOT_AVAILABLE
        else:
            free_bytes = int(result['quota_total']) - int(result['quota_used'])

        # get root dir info
        root_volume = {u'volume_id': u'', u'path': ROOT_PATH,
                       u'subscribed': 'True', u'type': self.be.ROOT_TYPE}

        # get shares and group by sharing user
        shares = defaultdict(list)
        for share in sample_shares:
            # filter out non accepted values
            if not bool(share['accepted']):
                continue

            share = share.copy()

            nicer_path = share[u'path'].replace(SHARES_PATH, SHARES_PATH_LINK)
            share[u'path'] = nicer_path
            share[u'type'] = self.be.SHARE_TYPE

            username = share['other_visible_name']
            if not username:
                username = share['other_username'] + \
                           ' (%s)' % self.be.NAME_NOT_SET

            shares[username].append(share)

        folders = []
        for folder in sample_folders:
            folder = folder.copy()
            folder[u'type'] = self.be.FOLDER_TYPE
            folders.append(folder)

        expected = [(u'', unicode(free_bytes), [root_volume] + folders)]
        for other_user, data in shares.iteritems():
            send_freebytes = any(d['access_level'] == 'Modify' for d in data)
            if send_freebytes:
                free_bytes = data[0][u'free_bytes']
            else:
                free_bytes = self.be.FREE_BYTES_NOT_AVAILABLE
            expected.append((other_user, free_bytes, data))

        returnValue(expected)

    @inlineCallbacks
00619     def test_volumes_info(self):
        """The volumes_info method exercises its callback."""
        self.patch(MockDBusClient, 'shares', SAMPLE_SHARES)
        self.patch(MockDBusClient, 'folders', SAMPLE_FOLDERS)

        expected = yield self.expected_volumes(SAMPLE_SHARES, SAMPLE_FOLDERS)
        result = yield self.be.volumes_info()
        self.assertEqual(result, expected)

    # pylint: disable=W0212

00630     def test_cached_volumes_are_initially_empty(self):
        """The cached volume list is empty."""
        self.assertEqual(self.be._volumes, {})

    @inlineCallbacks
00635     def test_volumes_are_cached(self):
        """The volume list is cached."""
        self.patch(MockDBusClient, 'shares', SAMPLE_SHARES)
        self.patch(MockDBusClient, 'folders', SAMPLE_FOLDERS)

        # get root dir info
        root_volume = {u'volume_id': u'', u'path': ROOT_PATH,
                       u'subscribed': 'True', u'type': self.be.ROOT_TYPE}
        expected = {u'': root_volume}
        for volume in SAMPLE_SHARES + SAMPLE_FOLDERS:
            volume = volume.copy()
            if volume[u'type'] == u'UDF':
                volume[u'type'] = self.be.FOLDER_TYPE
            else:
                if not bool(volume[u'accepted']):
                    continue
                path = volume[u'path']
                nicer_path = path.replace(SHARES_PATH, SHARES_PATH_LINK)
                volume[u'path'] = nicer_path
                volume[u'type'] = self.be.SHARE_TYPE

            sid = volume['volume_id']
            assert sid not in expected
            expected[sid] = volume

        _ = yield self.be.volumes_info()

        self.assertEqual(self.be._volumes, expected)

    @inlineCallbacks
00665     def test_cached_volumes_are_updated_with_volume_info(self):
        """The cached volume list is updated."""
        yield self.test_volumes_are_cached()
        yield self.test_volumes_are_cached()

    @inlineCallbacks
00671     def test_volumes_info_no_quota_for_read_onlys(self):
        """The volumes_info does not send qupota info for RO shares."""
        read_only_shares = [
            {u'accepted': u'True',
             u'subscribed': u'True',
             u'access_level': u'View',
             u'free_bytes': u'39892622746',
             u'generation': u'2704',
             u'name': u're',
             u'node_id': u'c483f419-ed28-490a-825d-a8c074e2d795',
             u'other_username': u'otheruser',
             u'other_visible_name': u'Other User',
             u'path': SHARES_PATH + u'/re from Other User',
             u'type': u'Share',
             u'volume_id': u'4a1b263b-a2b3-4f66-9e66-4cd18050810d'},
            {u'accepted': u'True',
             u'subscribed': u'True',
             u'access_level': u'View',
             u'free_bytes': u'39892622746',
             u'generation': u'2704',
             u'name': u'do',
             u'node_id': u'84544ea4-aefe-4f91-9bb9-ed7b0a805baf',
             u'other_username': u'otheruser',
             u'other_visible_name': u'Other User',
             u'path': SHARES_PATH + u'/do from Other User',
             u'type': u'Share',
             u'volume_id': u'7d130dfe-98b2-4bd5-8708-9eeba9838ac0'},
        ]

        self.patch(MockDBusClient, 'shares', read_only_shares)
        self.patch(MockDBusClient, 'folders', SAMPLE_FOLDERS)

        expected = yield self.expected_volumes(read_only_shares,
                                               SAMPLE_FOLDERS)
        result = yield self.be.volumes_info()
        self.assertEqual(result, expected)

    @inlineCallbacks
00709     def test_volumes_info_no_quota_for_root(self):
        """The volumes_info returns info even if quota call fails."""
        # pylint: disable=E1101
        self.be.wc.failure = 500
        self.patch(MockDBusClient, 'shares', SAMPLE_SHARES)
        self.patch(MockDBusClient, 'folders', SAMPLE_FOLDERS)

        expected = yield self.expected_volumes(SAMPLE_SHARES, SAMPLE_FOLDERS)
        result = yield self.be.volumes_info()
        self.assertEqual(result, expected)

    @inlineCallbacks
00721     def test_subscribe_volume_folder(self):
        """The subscribe_volume method subscribes a folder."""
        fid = '0123-4567'
        self.be._volumes[fid] = {u'type': self.be.FOLDER_TYPE}

        yield self.be.subscribe_volume(volume_id=fid)
        self.addCleanup(lambda: MockDBusClient.subscribed_folders.remove(fid))

        self.assertEqual(MockDBusClient.subscribed_folders, [fid])

    @inlineCallbacks
00732     def test_unsubscribe_volume_folder(self):
        """The unsubscribe_volume method unsubscribes a folder."""
        fid = '0123-4567'
        self.be._volumes[fid] = {u'type': self.be.FOLDER_TYPE}

        yield self.be.subscribe_volume(volume_id=fid)
        self.assertEqual(MockDBusClient.subscribed_folders, [fid])

        yield self.be.unsubscribe_volume(volume_id=fid)

        self.assertEqual(MockDBusClient.subscribed_folders, [])

    @inlineCallbacks
00745     def test_subscribe_volume_share(self):
        """The subscribe_volume method subscribes a share."""
        sid = '0123-4567'
        self.be._volumes[sid] = {u'type': self.be.SHARE_TYPE}

        yield self.be.subscribe_volume(volume_id=sid)
        self.addCleanup(lambda: MockDBusClient.subscribed_shares.remove(sid))

        self.assertEqual(MockDBusClient.subscribed_shares, [sid])

    @inlineCallbacks
00756     def test_unsubscribe_volume_share(self):
        """The unsubscribe_volume method unsubscribes a share."""
        sid = '0123-4567'
        self.be._volumes[sid] = {u'type': self.be.SHARE_TYPE}

        yield self.be.subscribe_volume(volume_id=sid)
        self.assertEqual(MockDBusClient.subscribed_shares, [sid])

        yield self.be.unsubscribe_volume(volume_id=sid)

        self.assertEqual(MockDBusClient.subscribed_shares, [])

    @inlineCallbacks
00769     def test_change_volume_settings_folder(self):
        """The volume settings can be changed."""
        fid = '0123-4567'
        self.be._volumes[fid] = {u'type': self.be.FOLDER_TYPE}

        yield self.be.change_volume_settings(fid, {'subscribed': 'True'})
        self.assertEqual(MockDBusClient.subscribed_folders, [fid])

        yield self.be.change_volume_settings(fid, {'subscribed': ''})
        self.assertEqual(MockDBusClient.subscribed_folders, [])

    @inlineCallbacks
00781     def test_change_volume_settings_share(self):
        """The volume settings can be changed."""
        sid = '0123-4567'
        self.be._volumes[sid] = {u'type': self.be.SHARE_TYPE}

        yield self.be.change_volume_settings(sid, {'subscribed': 'True'})
        self.assertEqual(MockDBusClient.subscribed_shares, [sid])

        yield self.be.change_volume_settings(sid, {'subscribed': ''})
        self.assertEqual(MockDBusClient.subscribed_shares, [])

    @inlineCallbacks
00793     def test_change_volume_settings_no_setting(self):
        """The change volume settings does not fail on empty settings."""
        yield self.be.change_volume_settings('test', {})
        self.assertEqual(MockDBusClient.subscribed_folders, [])
        self.assertEqual(MockDBusClient.subscribed_shares, [])


00800 class BackendSyncStatusTestCase(BackendBasicTestCase):
    """Syncdaemon state for the backend."""

    was_disabled = False

    def setUp(self):
        super(BackendSyncStatusTestCase, self).setUp()
        self.be.file_sync_disabled = self.was_disabled

00809     def _build_msg(self):
        """Build expected message regarding file sync status."""
        return '%s (%s)' % (MockDBusClient.status['description'],
                            MockDBusClient.status['name'])

    @inlineCallbacks
00815     def assert_correct_status(self, status, msg=None):
        """Check that the resulting status is correct."""
        expected = {MSG_KEY: self._build_msg() if msg is None else msg,
                    STATUS_KEY: status}
        result = yield self.be.file_sync_status()
        self.assertEqual(expected, result)

    @inlineCallbacks
00823     def test_disabled(self):
        """The syncdaemon status is processed and emitted."""
        self.patch(MockDBusClient, 'file_sync', False)
        yield self.assert_correct_status(FILE_SYNC_DISABLED, msg='')
        self.assertTrue(self.be.file_sync_disabled)

    @inlineCallbacks
00830     def test_error(self):
        """The syncdaemon status is processed and emitted."""
        MockDBusClient.status = {
            'is_error': 'True',  # nothing else matters
            'is_online': '', 'is_connected': '',
            'name': 'AUTH_FAILED', 'connection': '', 'queues': '',
            'description': 'auth failed',
        }
        yield self.assert_correct_status(FILE_SYNC_ERROR)
        # self.be.file_sync_disabled does not change
        self.assertEqual(self.was_disabled, self.be.file_sync_disabled)

    @inlineCallbacks
00843     def test_starting_when_init_not_user(self):
        """The syncdaemon status is processed and emitted."""
        MockDBusClient.status = {
            'is_error': '', 'is_online': '', 'is_connected': '',
            'connection': 'Not User With Network', 'queues': '',
            'name': 'INIT', 'description': 'something new',
        }
        yield self.assert_correct_status(FILE_SYNC_STARTING)
        self.assertFalse(self.be.file_sync_disabled)

    @inlineCallbacks
00854     def test_starting_when_init_with_user(self):
        """The syncdaemon status is processed and emitted."""
        MockDBusClient.status = {
            'is_error': '', 'is_online': '', 'is_connected': '',
            'connection': 'With User With Network', 'queues': '',
            'name': 'INIT', 'description': 'something new',
        }
        yield self.assert_correct_status(FILE_SYNC_STARTING)
        self.assertFalse(self.be.file_sync_disabled)

    @inlineCallbacks
00865     def test_starting_when_local_rescan_not_user(self):
        """The syncdaemon status is processed and emitted."""
        MockDBusClient.status = {
            'is_error': '', 'is_online': '', 'is_connected': '',
            'connection': 'Not User With Network', 'queues': '',
            'name': 'LOCAL_RESCAN', 'description': 'something new',
        }
        yield self.assert_correct_status(FILE_SYNC_STARTING)
        self.assertFalse(self.be.file_sync_disabled)

    @inlineCallbacks
00876     def test_starting_when_local_rescan_with_user(self):
        """The syncdaemon status is processed and emitted."""
        MockDBusClient.status = {
            'is_error': '', 'is_online': '', 'is_connected': '',
            'connection': 'With User With Network', 'queues': '',
            'name': 'LOCAL_RESCAN', 'description': 'something new',
        }
        yield self.assert_correct_status(FILE_SYNC_STARTING)
        self.assertFalse(self.be.file_sync_disabled)

    @inlineCallbacks
00887     def test_starting_when_ready_with_user(self):
        """The syncdaemon status is processed and emitted."""
        MockDBusClient.status = {
            'is_error': '', 'is_online': '', 'is_connected': '',
            'connection': 'With User With Network', 'queues': '',
            'name': 'READY', 'description': 'something nicer',
        }
        yield self.assert_correct_status(FILE_SYNC_STARTING)
        self.assertFalse(self.be.file_sync_disabled)

    @inlineCallbacks
00898     def test_disconnected(self):
        """The syncdaemon status is processed and emitted."""
        MockDBusClient.status = {
            'is_error': '', 'is_online': '', 'is_connected': '',
            'queues': '', 'description': 'something new',
            'connection': 'Not User With Network',  # user didn't connect
            'name': 'READY',  # must be READY, otherwise is STARTING
        }
        yield self.assert_correct_status(FILE_SYNC_DISCONNECTED)

        # self.be.file_sync_disabled does not change
        self.assertEqual(self.was_disabled, self.be.file_sync_disabled)

    @inlineCallbacks
00912     def test_disconnected_when_waiting(self):
        """The syncdaemon status is processed and emitted."""
        MockDBusClient.status = {
            'is_error': '', 'is_online': '', 'is_connected': '',
            'connection': 'With User With Network', 'queues': '',
            'name': 'WAITING', 'description': 'what a long wait!',
        }
        yield self.assert_correct_status(FILE_SYNC_DISCONNECTED)

        # self.be.file_sync_disabled does not change
        self.assertEqual(self.was_disabled, self.be.file_sync_disabled)

    @inlineCallbacks
00925     def test_syncing_if_online(self):
        """The syncdaemon status is processed and emitted."""
        MockDBusClient.status = {
            'is_error': '', 'is_online': 'True', 'is_connected': 'True',
            'name': 'QUEUE_MANAGER', 'connection': '',
            'queues': 'WORKING_ON_CONTENT',  # anything but IDLE
            'description': 'something nicer',
        }
        yield self.assert_correct_status(FILE_SYNC_SYNCING)

        # self.be.file_sync_disabled does not change
        self.assertEqual(self.was_disabled, self.be.file_sync_disabled)

    @inlineCallbacks
00939     def test_syncing_even_if_not_online(self):
        """The syncdaemon status is processed and emitted."""
        MockDBusClient.status = {
            'is_error': '', 'is_online': '', 'is_connected': 'True',
            'name': 'CHECK_VERSION', 'connection': '',
            'queues': 'WORKING_ON_CONTENT',
            'description': 'something nicer',
        }
        yield self.assert_correct_status(FILE_SYNC_SYNCING)

        # self.be.file_sync_disabled does not change
        self.assertEqual(self.was_disabled, self.be.file_sync_disabled)

    @inlineCallbacks
00953     def test_idle(self):
        """The syncdaemon status is processed and emitted."""
        MockDBusClient.status = {
            'is_error': '', 'is_online': 'True', 'is_connected': 'True',
            'name': 'QUEUE_MANAGER', 'connection': '',
            'queues': 'IDLE',
            'description': 'something nice',
        }
        yield self.assert_correct_status(FILE_SYNC_IDLE)

        # self.be.file_sync_disabled does not change
        self.assertEqual(self.was_disabled, self.be.file_sync_disabled)

    @inlineCallbacks
00967     def test_stopped(self):
        """The syncdaemon status is processed and emitted."""
        MockDBusClient.status = {
            'is_error': '', 'is_online': '', 'is_connected': '',
            'name': 'SHUTDOWN', 'connection': '',
            'queues': 'IDLE',
            'description': 'something nice',
        }
        yield self.assert_correct_status(FILE_SYNC_STOPPED)

        # self.be.file_sync_disabled does not change
        self.assertEqual(self.was_disabled, self.be.file_sync_disabled)

    @inlineCallbacks
00981     def test_unknown(self):
        """The syncdaemon status is processed and emitted."""
        MockDBusClient.status = {
            'is_error': '', 'is_online': '', 'is_connected': '',
            'name': '', 'connection': '', 'queues': '',
            'description': '',
        }
        yield self.assert_correct_status(FILE_SYNC_UNKNOWN)

        has_warning = self.memento.check_warning('file_sync_status: unknown',
                                                 repr(MockDBusClient.status))
        self.assertTrue(has_warning)

        # self.be.file_sync_disabled does not change
        self.assertEqual(self.was_disabled, self.be.file_sync_disabled)

00997     def test_status_changed(self):
        """The file_sync_status is the status changed handler."""
        self.be.status_changed_handler = self._set_called
        status = {'name': 'foo', 'description': 'bar', 'is_error': '',
                  'is_connected': '', 'is_online': '', 'queues': ''}
        # pylint: disable=E1101
        backend.dbus_client.status_changed_handler(status)

        # pylint: disable=W0212
        # Access to a protected member _process_file_sync_status
        expected_status = self.be._process_file_sync_status(status)
        self.assertEqual(self._called, ((expected_status,), {}))


01011 class BackendSyncStatusIfDisabledTestCase(BackendSyncStatusTestCase):
    """Syncdaemon state for the backend when file sync is disabled."""

    was_disabled = True

    @inlineCallbacks
01017     def assert_correct_status(self, status, msg=None):
        """Check that the resulting status is correct."""
        sup = super(BackendSyncStatusIfDisabledTestCase, self)
        if status != FILE_SYNC_STARTING:
            yield sup.assert_correct_status(FILE_SYNC_DISABLED, msg='')
        else:
            yield sup.assert_correct_status(status, msg=msg)


01026 class BackendFileSyncOpsTestCase(BackendBasicTestCase):
    """Syncdaemon operations for the backend."""

    def setUp(self):
        super(BackendFileSyncOpsTestCase, self).setUp()
        MockDBusClient.actions = []

    @inlineCallbacks
01034     def test_enable_files(self):
        """Files service is enabled."""
        yield self.be.disable_files()

        yield self.be.enable_files()
        self.assertTrue(MockDBusClient.file_sync)
        self.assertFalse(self.be.file_sync_disabled)

    @inlineCallbacks
01043     def test_disable_files(self):
        """Files service is disabled."""
        yield self.be.enable_files()

        yield self.be.disable_files()
        self.assertFalse(MockDBusClient.file_sync)
        self.assertTrue(self.be.file_sync_disabled)

    @inlineCallbacks
01052     def test_connect_files(self):
        """Connect files service."""
        yield self.be.connect_files()

        self.assertEqual(MockDBusClient.actions, ['connect'])
        self.assertFalse(self.be.file_sync_disabled)

    @inlineCallbacks
01060     def test_disconnect_files(self):
        """Disconnect files service."""
        yield self.be.disconnect_files()

        self.assertEqual(MockDBusClient.actions, ['disconnect'])
        self.assertFalse(self.be.file_sync_disabled)

    @inlineCallbacks
01068     def test_restart_files(self):
        """Restart the files service."""
        yield self.be.restart_files()

        self.assertEqual(MockDBusClient.actions, ['stop', 'start'])
        self.assertFalse(self.be.file_sync_disabled)

    @inlineCallbacks
01076     def test_start_files(self):
        """Start the files service."""
        yield self.be.start_files()

        self.assertEqual(MockDBusClient.actions, ['start'])
        self.assertFalse(self.be.file_sync_disabled)

    @inlineCallbacks
01084     def test_stop_files(self):
        """Stop the files service."""
        yield self.be.stop_files()

        self.assertEqual(MockDBusClient.actions, ['stop'])
        self.assertFalse(self.be.file_sync_disabled)


01092 class BackendReplicationsTestCase(BackendBasicTestCase):
    """Replications tests for the backend."""

    @inlineCallbacks
01096     def test_replications_info(self):
        """The replications_info method exercises its callback."""
        result = yield self.be.replications_info()

        # replications_info will use exclusions information
        expected = []
        for name in MockReplicationClient.replications:
            enabled = bool_str(name not in MockReplicationClient.exclusions)
            dependency = ''
            if name == MockReplicationClient.BOOKMARKS:
                dependency = backend.BOOKMARKS_PKG
            elif name == MockReplicationClient.CONTACTS:
                dependency = backend.CONTACTS_PKG

            item = {'replication_id': name, 'name': name,
                    'enabled': enabled, 'dependency': dependency}
            expected.append(item)
        self.assertEqual(sorted(expected), sorted(result))

    @inlineCallbacks
01116     def test_change_replication_settings(self):
        """The replication settings can be changed."""
        rid = '0123-4567'
        MockReplicationClient.replications.add(rid)
        self.addCleanup(lambda: MockReplicationClient.replications.remove(rid))

        yield self.be.change_replication_settings(rid, {'enabled': ''})
        self.assertIn(rid, MockReplicationClient.exclusions)

        yield self.be.change_replication_settings(rid, {'enabled': 'True'})
        self.assertNotIn(rid, MockReplicationClient.exclusions)

    @inlineCallbacks
01129     def test_change_replication_settings_not_in_replications(self):
        """The settings can not be changed for an item not in replications."""
        rid = '0123-4567'
        assert rid not in MockReplicationClient.replications

        d = self.be.change_replication_settings(rid, {'enabled': 'True'})
        yield self.assertFailure(d, replication_client.ReplicationError)

        d = self.be.change_replication_settings(rid, {'enabled': ''})
        yield self.assertFailure(d, replication_client.ReplicationError)

    @inlineCallbacks
01141     def test_change_replication_settings_no_setting(self):
        """The change replication settings does not fail on empty settings."""
        rid = '0123-4567'
        MockReplicationClient.replications.add(rid)
        self.addCleanup(lambda: MockReplicationClient.replications.remove(rid))

        prior = MockReplicationClient.exclusions.copy()
        yield self.be.change_replication_settings(rid, {})

        self.assertEqual(MockReplicationClient.exclusions, prior)

Generated by  Doxygen 1.6.0   Back to index