Logo Search packages:      
Sourcecode: ubuntuone-control-panel version File versions  Download package

test_dbus_client_sd.py

# -*- coding: utf-8 -*-

# Authors: Alejandro J. Cura <alecu@canonical.com>
# Authors: Natalia B. Bidart <natalia.bidart@canonical.com>
#
# Copyright 2010 Canonical Ltd.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 3, as published
# by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranties of
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
# PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.

"""Tests for the DBus service when accessing SyncDaemon."""

import uuid

import dbus

from twisted.internet.defer import inlineCallbacks, returnValue

from ubuntuone.controlpanel import dbus_client
from ubuntuone.controlpanel.dbus_client import sd_dbus_iface
from ubuntuone.controlpanel.integrationtests import (TestDBusException,
    DBusClientTestCase)


SAMPLE_LIMITS = {
    "upload": "999",
    "download": "838",
}

SHARES = {}

# pylint, you have to go to decorator's school
# pylint: disable=C0322

# Access to a protected member of a client class
# pylint: disable=W0212


00048 class ConfigMockDBusSyncDaemon(dbus.service.Object):
    """A mock object that mimicks syncdaemon regarding the config iface."""

    show_all_notifications = True
    throttling = False
    limits = {"download": -1, "upload": -1}

    @dbus.service.method(dbus_interface=sd_dbus_iface.DBUS_IFACE_CONFIG_NAME,
                         in_signature="", out_signature="a{si}")
00057     def get_throttling_limits(self):
        """Return the throttling dict."""
        return self.limits

    @dbus.service.method(in_signature="ii",
               dbus_interface=sd_dbus_iface.DBUS_IFACE_CONFIG_NAME)
00063     def set_throttling_limits(self, download, upload):
        """Set the throttling dict."""
        self.limits["download"] = download
        self.limits["upload"] = upload

    @dbus.service.method(in_signature="", out_signature="b",
              dbus_interface=sd_dbus_iface.DBUS_IFACE_CONFIG_NAME)
00070     def bandwidth_throttling_enabled(self):
        """Get the state of bw throttling."""
        return self.throttling

    @dbus.service.method(dbus_interface=sd_dbus_iface.DBUS_IFACE_CONFIG_NAME)
00075     def enable_bandwidth_throttling(self):
        """Enable bw throttling."""
        self.throttling = True

    @dbus.service.method(dbus_interface=sd_dbus_iface.DBUS_IFACE_CONFIG_NAME)
00080     def disable_bandwidth_throttling(self):
        """Disable bw throttling."""
        self.throttling = False

    @dbus.service.method(in_signature="", out_signature="b",
              dbus_interface=sd_dbus_iface.DBUS_IFACE_CONFIG_NAME)
00086     def show_all_notifications_enabled(self):
        """Get the state of show_all_notifications."""
        return self.show_all_notifications

    @dbus.service.method(dbus_interface=sd_dbus_iface.DBUS_IFACE_CONFIG_NAME)
00091     def enable_show_all_notifications(self):
        """Enable show_all_notifications."""
        self.show_all_notifications = True

    @dbus.service.method(dbus_interface=sd_dbus_iface.DBUS_IFACE_CONFIG_NAME)
00096     def disable_show_all_notifications(self):
        """Disable show_all_notifications."""
        self.show_all_notifications = False


00101 class ConfigMockDBusSyncDaemonFailing(dbus.service.Object):
    """A mock object that mimicks syncdaemon but fails."""

    @dbus.service.method(dbus_interface=sd_dbus_iface.DBUS_IFACE_CONFIG_NAME,
                         in_signature="", out_signature="a{si}")
00106     def get_throttling_limits(self):
        """Fail while getting the throttling dict."""
        raise TestDBusException()

    @dbus.service.method(in_signature="ii",
               dbus_interface=sd_dbus_iface.DBUS_IFACE_CONFIG_NAME)
00112     def set_throttling_limits(self, download, upload):
        """Fail while setting the throttling dict."""
        raise TestDBusException()

    @dbus.service.method(in_signature="", out_signature="b",
              dbus_interface=sd_dbus_iface.DBUS_IFACE_CONFIG_NAME)
00118     def bandwidth_throttling_enabled(self):
        """Get the state of bw throttling."""
        raise TestDBusException()

    @dbus.service.method(dbus_interface=sd_dbus_iface.DBUS_IFACE_CONFIG_NAME)
00123     def enable_bandwidth_throttling(self):
        """Enable bw throttling."""
        raise TestDBusException()

    @dbus.service.method(dbus_interface=sd_dbus_iface.DBUS_IFACE_CONFIG_NAME)
00128     def disable_bandwidth_throttling(self):
        """Disable bw throttling."""
        raise TestDBusException()

    @dbus.service.method(in_signature="", out_signature="b",
              dbus_interface=sd_dbus_iface.DBUS_IFACE_CONFIG_NAME)
00134     def show_all_notifications_enabled(self):
        """Get the state of show_all_notifications."""
        raise TestDBusException()

    @dbus.service.method(dbus_interface=sd_dbus_iface.DBUS_IFACE_CONFIG_NAME)
00139     def enable_show_all_notifications(self):
        """Enable show_all_notifications."""
        raise TestDBusException()

    @dbus.service.method(dbus_interface=sd_dbus_iface.DBUS_IFACE_CONFIG_NAME)
00144     def disable_show_all_notifications(self):
        """Disable show_all_notifications."""
        raise TestDBusException()


00149 class ThrottlingTestCase(DBusClientTestCase):
    """Test for the throttling dbus client methods."""

    @inlineCallbacks
00153     def test_throttle_limits_returned(self):
        """When SD returns the limits, they are handled."""
        self.register_mockserver(sd_dbus_iface.DBUS_IFACE_NAME,
                                 "/config", ConfigMockDBusSyncDaemon)
        limits = yield dbus_client.get_throttling_limits()
        self.assertEqual(limits, ConfigMockDBusSyncDaemon.limits)

    @inlineCallbacks
00161     def test_getting_throttle_limits_throws_an_error(self):
        """Handle DBus error when getting the limits."""
        self.register_mockserver(sd_dbus_iface.DBUS_IFACE_NAME,
                                 "/config", ConfigMockDBusSyncDaemonFailing)
        yield self.assertFailure(dbus_client.get_throttling_limits(),
                                 dbus.DBusException)

    @inlineCallbacks
00169     def test_throttle_limits_set(self):
        """Setting the limits."""
        self.register_mockserver(sd_dbus_iface.DBUS_IFACE_NAME,
                                 "/config", ConfigMockDBusSyncDaemon)
        yield dbus_client.set_throttling_limits(SAMPLE_LIMITS)
        expected_result = {
            "download": int(SAMPLE_LIMITS["download"]),
            "upload": int(SAMPLE_LIMITS["upload"]),
        }
        self.assertEqual(self.mock.limits, expected_result)

    @inlineCallbacks
00181     def test_setting_throttle_limits_throws_an_error(self):
        """Handle DBus error when setting the limits."""
        self.register_mockserver(sd_dbus_iface.DBUS_IFACE_NAME,
                                 "/config", ConfigMockDBusSyncDaemonFailing)
        d = dbus_client.set_throttling_limits(SAMPLE_LIMITS)
        yield self.assertFailure(d, dbus.DBusException)

    @inlineCallbacks
00189     def test_throttle_get(self):
        """Getting the throttling state."""
        self.register_mockserver(sd_dbus_iface.DBUS_IFACE_NAME,
                                 "/config", ConfigMockDBusSyncDaemon)
        self.mock.throttling = False
        throttling = yield dbus_client.bandwidth_throttling_enabled()
        self.assertEqual(throttling, False)
        self.mock.throttling = True
        throttling = yield dbus_client.bandwidth_throttling_enabled()
        self.assertEqual(throttling, True)

    @inlineCallbacks
00201     def test_throttle_get_throws_an_error(self):
        """Handle DBus error when getting the throttling state."""
        self.register_mockserver(sd_dbus_iface.DBUS_IFACE_NAME,
                                 "/config", ConfigMockDBusSyncDaemonFailing)
        d = dbus_client.bandwidth_throttling_enabled()
        yield self.assertFailure(d, dbus.DBusException)

    @inlineCallbacks
00209     def test_throttle_enable(self):
        """Enabling bw throttling."""
        self.register_mockserver(sd_dbus_iface.DBUS_IFACE_NAME,
                                 "/config", ConfigMockDBusSyncDaemon)
        self.mock.throttling = False
        yield dbus_client.enable_bandwidth_throttling()
        self.assertEqual(self.mock.throttling, True)

    @inlineCallbacks
00218     def test_throttle_enable_throws_an_error(self):
        """Handle DBus error when enabling throttling."""
        self.register_mockserver(sd_dbus_iface.DBUS_IFACE_NAME,
                                 "/config", ConfigMockDBusSyncDaemonFailing)
        d = dbus_client.enable_bandwidth_throttling()
        yield self.assertFailure(d, dbus.DBusException)

    @inlineCallbacks
00226     def test_throttle_disable(self):
        """Disabling bw throttling."""
        self.register_mockserver(sd_dbus_iface.DBUS_IFACE_NAME,
                                 "/config", ConfigMockDBusSyncDaemon)
        self.mock.throttling = True
        yield dbus_client.disable_bandwidth_throttling()
        self.assertEqual(self.mock.throttling, False)

    @inlineCallbacks
00235     def test_throttle_disable_throws_an_error(self):
        """Handle DBus error when disabling throttling."""
        self.register_mockserver(sd_dbus_iface.DBUS_IFACE_NAME,
                                 "/config", ConfigMockDBusSyncDaemonFailing)
        d = dbus_client.disable_bandwidth_throttling()
        yield self.assertFailure(d, dbus.DBusException)


00243 class NotificationsTestCase(DBusClientTestCase):
    """Test for the show_all_notifications dbus client methods."""

    @inlineCallbacks
00247     def test_notifications_get(self):
        """Getting the show_all_notifications state."""
        self.register_mockserver(sd_dbus_iface.DBUS_IFACE_NAME,
                                 "/config", ConfigMockDBusSyncDaemon)
        self.mock.show_all_notifications = False
        state = yield dbus_client.show_all_notifications_enabled()
        self.assertEqual(state, False)
        self.mock.show_all_notifications = True
        state = yield dbus_client.show_all_notifications_enabled()
        self.assertEqual(state, True)

    @inlineCallbacks
00259     def test_notifications_get_throws_an_error(self):
        """Handle DBus error when getting the show_all_notifications state."""
        self.register_mockserver(sd_dbus_iface.DBUS_IFACE_NAME,
                                 "/config", ConfigMockDBusSyncDaemonFailing)
        d = dbus_client.show_all_notifications_enabled()
        yield self.assertFailure(d, dbus.DBusException)

    @inlineCallbacks
00267     def test_notifications_enable(self):
        """Enabling show_all_notifications."""
        self.register_mockserver(sd_dbus_iface.DBUS_IFACE_NAME,
                                 "/config", ConfigMockDBusSyncDaemon)
        self.mock.show_all_notifications = False
        yield dbus_client.enable_show_all_notifications()
        self.assertEqual(self.mock.show_all_notifications, True)

    @inlineCallbacks
00276     def test_notifications_enable_throws_an_error(self):
        """Handle DBus error when enabling show_all_notifications."""
        self.register_mockserver(sd_dbus_iface.DBUS_IFACE_NAME,
                                 "/config", ConfigMockDBusSyncDaemonFailing)
        d = dbus_client.enable_show_all_notifications()
        yield self.assertFailure(d, dbus.DBusException)

    @inlineCallbacks
00284     def test_notifications_disable(self):
        """Disabling show_all_notifications."""
        self.register_mockserver(sd_dbus_iface.DBUS_IFACE_NAME,
                                 "/config", ConfigMockDBusSyncDaemon)
        self.mock.show_all_notifications = True
        yield dbus_client.disable_show_all_notifications()
        self.assertEqual(self.mock.show_all_notifications, False)

    @inlineCallbacks
00293     def test_notifications_disable_throws_an_error(self):
        """Handle DBus error when disabling show_all_notifications."""
        self.register_mockserver(sd_dbus_iface.DBUS_IFACE_NAME,
                                 "/config", ConfigMockDBusSyncDaemonFailing)
        d = dbus_client.disable_show_all_notifications()
        yield self.assertFailure(d, dbus.DBusException)


00301 class FoldersMockDBusSyncDaemon(sd_dbus_iface.Folders):
    """A mock object that mimicks syncdaemon regarding the Folders iface."""

    # __init__ method from a non direct base class 'Object' is called
    # __init__ method from base class 'Folders' is not called
    # pylint: disable=W0231, W0233

    def __init__(self, object_path, conn):
        self.udfs = {}
        self.udf_id = 1
        dbus.service.Object.__init__(self,
                                     object_path=object_path, conn=conn)

    @classmethod
00315     def _new_udf(cls, uid, path, subscribed=False):
        """Create a new faked udf."""
        udf = {}
        if isinstance(path, str):
            path = path.decode('utf-8')
        udf[u'path'] = udf[u'suggested_path'] = path
        udf[u'volume_id'] = unicode(uid)
        udf[u'subscribed'] = sd_dbus_iface.bool_str(subscribed)
        udf[u'node_id'] = u'a18f4cbd-a846-4405-aaa1-b28904817089'
        udf[u'generation'] = u''
        udf[u'type'] = u'UDF'

        return udf

    @dbus.service.method(sd_dbus_iface.DBUS_IFACE_FOLDERS_NAME,
                         in_signature='s')
00331     def create(self, path):
        """Create a user defined folder in the specified path."""
        if path == '':  # simulate an error
            self.emit_folder_create_error(path, 'create failed!')
            return

        udf = self._new_udf(self.udf_id, path)
        self.udfs[udf['volume_id']] = udf
        self.udf_id += 1
        self.emit_folder_created(udf)

    @dbus.service.method(sd_dbus_iface.DBUS_IFACE_FOLDERS_NAME,
                         in_signature='s')
00344     def delete(self, folder_id):
        """Delete the folder specified by folder_id"""
        if folder_id not in self.udfs:
            self.emit_folder_delete_error(folder_id, 'failed!')
            return

        udf = self.udfs.pop(folder_id)
        self.emit_folder_deleted(udf)

    @dbus.service.method(sd_dbus_iface.DBUS_IFACE_FOLDERS_NAME,
                         out_signature='aa{ss}')
00355     def get_folders(self):
        """Return the list of folders (a list of dicts)"""
        return [sd_dbus_iface.get_udf_dict(udf) for udf in self.udfs.values()]

    @dbus.service.method(sd_dbus_iface.DBUS_IFACE_FOLDERS_NAME,
                         in_signature='s', out_signature='')
00361     def subscribe(self, folder_id):
        """Subscribe to the specified folder"""
        if folder_id not in self.udfs:
            # simulate error
            self.emit_folder_subscribe_error(folder_id, 'some error')
            return

        self.udfs[folder_id]['subscribed'] = sd_dbus_iface.bool_str(True)
        self.emit_folder_subscribed(self.udfs[folder_id])

    @dbus.service.method(sd_dbus_iface.DBUS_IFACE_FOLDERS_NAME,
                         in_signature='s', out_signature='')
00373     def unsubscribe(self, folder_id):
        """Unsubscribe from the specified folder"""
        if folder_id not in self.udfs:
            # simulate error
            self.emit_folder_unsubscribe_error(folder_id, 'some error')
            return

        self.udfs[folder_id]['subscribed'] = sd_dbus_iface.bool_str(False)
        self.emit_folder_unsubscribed(self.udfs[folder_id])

    @dbus.service.method(sd_dbus_iface.DBUS_IFACE_FOLDERS_NAME,
                         in_signature='s', out_signature='a{ss}')
00385     def get_info(self, path):
        """Override get_info to avoid accessing filesystem_manager."""
        for fid, content in self.udfs.iteritems():
            if content['path'] == path:
                return content
        else:
            return {}

    @dbus.service.method(sd_dbus_iface.DBUS_IFACE_FOLDERS_NAME,
                         in_signature='', out_signature='')
00395     def refresh_volumes(self):
        """Refresh the volumes list, requesting it to the server."""


00399 class FoldersTestCase(DBusClientTestCase):
    """Test for the shares dbus client methods."""

    def setUp(self):
        super(FoldersTestCase, self).setUp()
        self.patch(sd_dbus_iface, 'get_udf_dict', lambda udf: udf)
        self.register_mockserver(sd_dbus_iface.DBUS_IFACE_NAME,
                                 "/folders", FoldersMockDBusSyncDaemon)

    @inlineCallbacks
00409     def test_get_folders(self):
        """Retrieve folders info list."""
        path = '~/bar/baz'
        yield dbus_client.create_folder(path)

        result = yield dbus_client.get_folders()

        expected = FoldersMockDBusSyncDaemon._new_udf(1, path)
        self.assertEqual(result, [sd_dbus_iface.get_udf_dict(expected)])

    @inlineCallbacks
00420     def test_get_folders_error(self):
        """Handle error when retrieving current syncdaemon status."""
        path = '~/bar/baz'
        yield dbus_client.create_folder(path)

        def fail(value):
            """Fake an error."""
            raise TestDBusException(value)

        self.patch(sd_dbus_iface, 'get_udf_dict', fail)

        try:
            yield dbus_client.get_folders()
        except dbus.DBusException:
            pass  # test passes!
        else:
            self.fail('dbus_client.get_folders should be errbacking')

    @inlineCallbacks
00439     def test_create_folder(self):
        """Create a new folder."""
        path = '~/bar/baz'
        folder_info = yield dbus_client.create_folder(path)

        expected = FoldersMockDBusSyncDaemon._new_udf(1, path)
        self.assertEqual(sd_dbus_iface.get_udf_dict(expected), folder_info)

    @inlineCallbacks
00448     def test_create_folder_error(self):
        """Create a new folder fails."""
        path = ''
        try:
            yield dbus_client.create_folder(path=path)
        except dbus_client.VolumesError, e:
            self.assertEqual(e[0], {'path': path})
        else:
            self.fail('dbus_client.create_folder should be errbacking')

    @inlineCallbacks
00459     def test_subscribe_folder(self):
        """Subscribe to a folder."""
        path = '~/bar/baz'
        folder_info = yield dbus_client.create_folder(path)
        fid = folder_info['volume_id']
        yield dbus_client.subscribe_folder(fid)

        result = yield dbus_client.get_folders()
        expected, = filter(lambda folder: folder['volume_id'] == fid, result)
        self.assertEqual(expected['subscribed'], 'True')

    @inlineCallbacks
00471     def test_subscribe_folder_error(self):
        """Subscribe to a folder."""
        fid = u'does not exist'
        try:
            yield dbus_client.subscribe_folder(fid)
        except dbus_client.VolumesError, e:
            self.assertEqual(e[0], fid)
        else:
            self.fail('dbus_client.subscribe_folder should be errbacking')

    @inlineCallbacks
00482     def test_unsubscribe_folder(self):
        """Unsubscribe to a folder."""
        path = '~/bar/baz'
        folder_info = yield dbus_client.create_folder(path)
        fid = folder_info['volume_id']
        yield dbus_client.subscribe_folder(fid)
        # folder is subscribed

        yield dbus_client.unsubscribe_folder(fid)

        result = yield dbus_client.get_folders()
        expected, = filter(lambda folder: folder['volume_id'] == fid, result)
        self.assertEqual(expected['subscribed'], '')

    @inlineCallbacks
00497     def test_unsubscribe_folder_error(self):
        """Unsubscribe to a folder."""
        fid = u'does not exist'
        try:
            yield dbus_client.unsubscribe_folder(fid)
        except dbus_client.VolumesError, e:
            self.assertEqual(e[0], fid)
        else:
            self.fail('dbus_client.unsubscribe_folder should be errbacking')


00508 class FakedSyncDaemonTool(object):
    """Fake the FakedSyncDaemonTool."""

00511     def __init__(self, bus):
        """New instance."""

00514     def _set_share_attr(self, share_id, attr, value):
        """Set values to shares."""
        if share_id not in SHARES:
            raise dbus.DBusException('share_id not in SHARES.')

        value = sd_dbus_iface.bool_str(value)
        SHARES[share_id][attr] = value
        return share_id

    @inlineCallbacks
00524     def accept_share(self, share_id):
        """Accept the share with id: share_id."""
        yield self._set_share_attr(share_id, u'accepted', True)

    @inlineCallbacks
00529     def reject_share(self, share_id):
        """Reject the share with id: share_id."""
        yield self._set_share_attr(share_id, u'accepted', False)

    @inlineCallbacks
00534     def subscribe_share(self, share_id):
        """Subscribe to a share given its id."""
        yield self._set_share_attr(share_id, u'subscribed', True)

    @inlineCallbacks
00539     def unsubscribe_share(self, share_id):
        """Unsubscribe from a share given its id."""
        yield self._set_share_attr(share_id, u'subscribed', False)

    @inlineCallbacks
00544     def get_shares(self):
        """Get the list of shares (accepted or not)."""
        result = yield SHARES.values()
        returnValue(sorted(result))

    @inlineCallbacks
00550     def refresh_shares(self):
        """Call refresh_shares method via DBus."""
        yield

    @classmethod
00555     def create_share(cls, name, accepted=False, subscribed=False):
        """Add a new share (fake)."""
        sid = unicode(uuid.uuid4())
        path = u'/home/tester/.hidden/shares/%s from The Othr User' % name
        share = {
            u'name': name, u'subscribed': sd_dbus_iface.bool_str(subscribed),
            u'accepted': sd_dbus_iface.bool_str(accepted),
            u'generation': u'36', u'type': u'Share',
            u'node_id': u'ca3a1cec-09d2-485e-9685-1a5180bd6441',
            u'volume_id': sid, u'access_level': u'View',
            u'other_username': u'https://login.ubuntu.com/+id/nHRnYmz',
            u'other_visible_name': u'The Other User',
            u'free_bytes': u'2146703403', u'path': path,
        }
        SHARES[sid] = share
        return share


00573 class SharesTestCase(DBusClientTestCase):
    """Test for the shares dbus client methods."""

    def setUp(self):
        super(SharesTestCase, self).setUp()
        self.patch(dbus_client, 'SyncDaemonTool', FakedSyncDaemonTool)
        SHARES.clear()

    @inlineCallbacks
00582     def test_get_shares(self):
        """Retrieve shares info list."""
        share1 = FakedSyncDaemonTool.create_share(name='first, test me!')
        share2 = FakedSyncDaemonTool.create_share(name='and test me more!',
                                                accepted=True)
        share3 = FakedSyncDaemonTool.create_share(name='last but not least',
                                                accepted=True, subscribed=True)

        result = yield dbus_client.get_shares()

        self.assertEqual(result, sorted([share1, share2, share3]))

    @inlineCallbacks
00595     def test_get_shares_error(self):
        """Handle error when retrieving current syncdaemon status."""
        self.patch(FakedSyncDaemonTool, 'get_shares', self.fail)
        try:
            yield dbus_client.get_shares()
        except:  # pylint: disable=W0702
            pass  # test passes!
        else:
            self.fail('dbus_client.get_shares should be errbacking')

    @inlineCallbacks
00606     def test_subscribe_share(self):
        """Subscribe to a share."""
        share = FakedSyncDaemonTool.create_share(name='to be subscribed',
                                               accepted=True, subscribed=False)
        sid = share['volume_id']

        yield dbus_client.subscribe_share(sid)

        result = yield dbus_client.get_shares()
        expected, = filter(lambda share: share['volume_id'] == sid, result)
        self.assertEqual(expected['subscribed'], 'True')

    @inlineCallbacks
00619     def test_subscribe_share_error(self):
        """Subscribe to a share."""
        sid = u'does not exist'
        try:
            yield dbus_client.subscribe_share(sid)
        except dbus_client.VolumesError, e:
            self.assertEqual(e[0], sid)
        else:
            self.fail('dbus_client.subscribe_share should be errbacking')

    @inlineCallbacks
00630     def test_unsubscribe_share(self):
        """Unsubscribe to a share."""
        share = FakedSyncDaemonTool.create_share(name='to be unsubscribed',
                                               accepted=True, subscribed=True)
        sid = share['volume_id']
        yield dbus_client.subscribe_share(sid)
        # share is subscribed

        yield dbus_client.unsubscribe_share(sid)

        result = yield dbus_client.get_shares()
        expected, = filter(lambda share: share['volume_id'] == sid, result)
        self.assertEqual(expected['subscribed'], '')

    @inlineCallbacks
00645     def test_unsubscribe_share_error(self):
        """Unsubscribe to a share."""
        sid = u'does not exist'
        try:
            yield dbus_client.unsubscribe_share(sid)
        except dbus_client.VolumesError, e:
            self.assertEqual(e[0], sid)
        else:
            self.fail('dbus_client.unsubscribe_share should be errbacking')


00656 class StatusMockDBusSyncDaemon(dbus.service.Object):
    """A mock object that mimicks syncdaemon regarding the Status iface."""

    state_dict = {
        'name': 'TEST',
        'description': 'Some test state, nothing else.',
        'is_error': '',
        'is_connected': 'True',
        'is_online': '',
        'queues': 'GORGEOUS',
        'connection': '',
    }

00669     def _get_current_state(self):
        """Get the current status of the system."""
        return self.state_dict

    @dbus.service.method(sd_dbus_iface.DBUS_IFACE_STATUS_NAME,
                         in_signature='', out_signature='a{ss}')
00675     def current_status(self):
        """Return the current faked status of the system."""
        return self._get_current_state()

    # pylint: disable=C0103
    # Invalid name "StatusChanged"

    @dbus.service.signal(sd_dbus_iface.DBUS_IFACE_STATUS_NAME)
00683     def StatusChanged(self, status):
        """Fire a signal to notify that the status of the system changed."""

00686     def emit_status_changed(self, state=None):
        """Emit StatusChanged."""
        self.StatusChanged(self._get_current_state())


00691 class StatusTestCase(DBusClientTestCase):
    """Test for the status dbus client methods."""

    def setUp(self):
        super(StatusTestCase, self).setUp()
        self.register_mockserver(sd_dbus_iface.DBUS_IFACE_NAME,
                                 "/status", StatusMockDBusSyncDaemon)

    @inlineCallbacks
00700     def test_get_current_status(self):
        """Retrieve current syncdaemon status."""
        status = yield dbus_client.get_current_status()

        self.assertEqual(StatusMockDBusSyncDaemon.state_dict, status)

    @inlineCallbacks
00707     def test_get_current_status_error(self):
        """Handle error when retrieving current syncdaemon status."""

        def fail(value):
            """Fake an error."""
            raise TestDBusException(value)

        self.patch(StatusMockDBusSyncDaemon, '_get_current_state', fail)

        try:
            yield dbus_client.get_current_status()
        except dbus.DBusException:
            pass  # test passes!
        else:
            self.fail('dbus_client.get_current_status should be errbacking')

00723     def test_set_status_changed_handler(self):
        """A proper callback can be connected to StatusChanged signal."""
        _, sig = dbus_client.set_status_changed_handler(self._set_called)

        self.assertEqual(sig._handler, self._set_called)
        self.assertEqual(sig._member, 'StatusChanged')
        self.assertEqual(sig._path, '/status')
        self.assertEqual(sig._interface, sd_dbus_iface.DBUS_IFACE_STATUS_NAME)


00733 class FileSyncTestCase(DBusClientTestCase):
    """Test for the files sync enabled dbus client methods."""

    @inlineCallbacks
00737     def test_files_sync_enabled(self):
        """Retrieve whether file sync is enabled."""
        expected = object()
        self.patch(dbus_client.SyncDaemonTool, 'is_files_sync_enabled',
                   lambda _: expected)

        enabled = yield dbus_client.files_sync_enabled()

        self.assertEqual(expected, enabled)

    @inlineCallbacks
00748     def test_set_files_sync_enabled(self):
        """Set if file sync is enabled or not."""
        self.patch(dbus_client.SyncDaemonTool, 'enable_files_sync',
                   self._set_called)
        expected = object()
        # set some unique value
        yield dbus_client.set_files_sync_enabled(expected)

        self.assertEqual(self._called, ((expected,), {}))

    @inlineCallbacks
00759     def test_connect_file_sync(self):
        """Set if file sync is enabled or not."""
        self.patch(dbus_client.SyncDaemonTool, 'connect', self._set_called)
        yield dbus_client.connect_file_sync()

        self.assertEqual(self._called, ((), {}))

    @inlineCallbacks
00767     def test_disconnect_file_sync(self):
        """Set if file sync is enabled or not."""
        self.patch(dbus_client.SyncDaemonTool, 'disconnect', self._set_called)
        yield dbus_client.disconnect_file_sync()

        self.assertEqual(self._called, ((), {}))

    @inlineCallbacks
00775     def test_start_file_sync(self):
        """Set if file sync is enabled or not."""
        self.patch(dbus_client.SyncDaemonTool, 'start', self._set_called)
        yield dbus_client.start_file_sync()

        self.assertEqual(self._called, ((), {}))

    @inlineCallbacks
00783     def test_stop_file_sync(self):
        """Set if file sync is enabled or not."""
        self.patch(dbus_client.SyncDaemonTool, 'quit', self._set_called)
        yield dbus_client.stop_file_sync()

        self.assertEqual(self._called, ((), {}))


00791 class MockDBusSyncDaemon(dbus.service.Object):
    """A mock object that mimicks syncdaemon."""

    ROOT_DIR = u'/yadda/yoda/Test me'
    SHARES_DIR = u'/yadda/yoda/.hidden/ugly/dir/name/shares'
    SHARES_DIR_LINK = u'/yadda/yoda/Test me/Shared With Me'

    @dbus.service.method(sd_dbus_iface.DBUS_IFACE_SYNC_NAME,
                         in_signature='', out_signature='s')
00800     def get_rootdir(self):
        """Return the root dir/mount point."""
        return self.ROOT_DIR

    @dbus.service.method(sd_dbus_iface.DBUS_IFACE_SYNC_NAME,
                         in_signature='', out_signature='s')
00806     def get_sharesdir(self):
        """Return the shares dir/mount point."""
        return self.SHARES_DIR

    @dbus.service.method(sd_dbus_iface.DBUS_IFACE_SYNC_NAME,
                         in_signature='', out_signature='s')
00812     def get_sharesdir_link(self):
        """Return the root dir/mount point."""
        return self.SHARES_DIR_LINK


00817 class BasicTestCase(DBusClientTestCase):
    """Test for the basic dbus client methods."""

    def setUp(self):
        super(BasicTestCase, self).setUp()
        self.register_mockserver(sd_dbus_iface.DBUS_IFACE_NAME,
                                 "/", MockDBusSyncDaemon)

    @inlineCallbacks
00826     def test_get_root_dir(self):
        """Retrieve current syncdaemon root dir."""
        root = yield dbus_client.get_root_dir()

        self.assertEqual(MockDBusSyncDaemon.ROOT_DIR, root)

    @inlineCallbacks
00833     def test_get_shares_dir(self):
        """Retrieve current syncdaemon shares dir."""
        result = yield dbus_client.get_shares_dir()

        self.assertEqual(MockDBusSyncDaemon.SHARES_DIR, result)

    @inlineCallbacks
00840     def test_get_shares_dir_link(self):
        """Retrieve current syncdaemon shares dir."""
        result = yield dbus_client.get_shares_dir_link()

        self.assertEqual(MockDBusSyncDaemon.SHARES_DIR_LINK, result)

Generated by  Doxygen 1.6.0   Back to index