#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#-------------------------------------------------------------------------------
# This file is part of Mentat system (https://mentat.cesnet.cz/).
#
# Copyright (C) since 2011 CESNET, z.s.p.o (http://www.ces.net/)
# Use of this source is governed by the MIT license, see LICENSE file.
#-------------------------------------------------------------------------------
"""
Unit test module for testing :py:mod:`mentat.dirq` module.
"""
import unittest
import os
import shutil
import mentat.dirq
#
# Global variables
#
DIRQ = '/tmp/dirq.tmpd' # Name of the test directory input queue
DIRD = '/tmp/dirq.out.tmpd' # Name of the test directory output queue
[docs]class TestDirectoryQueue(unittest.TestCase):
"""
Unit test class for testing :py:class:`mentat.dirq.DirectoryQueue` module.
"""
[docs] def setUp(self):
try:
qdir_name = DIRQ
os.mkdir(qdir_name)
ddir_name = DIRD
os.mkdir(ddir_name)
except FileExistsError:
pass
[docs] def tearDown(self):
shutil.rmtree(DIRQ)
shutil.rmtree(DIRD)
[docs] def test_01_basic(self):
"""
Perform the basic operativity tests.
"""
dqh = mentat.dirq.DirectoryQueue(DIRQ, dir_next_queue = DIRD)
# Check that all necessary subdirectories were created
self.assertTrue(os.path.isdir(os.path.join(DIRQ, 'incoming')))
self.assertTrue(os.path.isdir(os.path.join(DIRQ, 'pending')))
self.assertTrue(os.path.isdir(os.path.join(DIRQ, 'errors')))
self.assertTrue(os.path.isdir(os.path.join(DIRQ, 'tmp')))
self.assertTrue(os.path.isdir(os.path.join(DIRD, 'incoming')))
self.assertTrue(os.path.isdir(os.path.join(DIRD, 'pending')))
self.assertTrue(os.path.isdir(os.path.join(DIRD, 'errors')))
self.assertTrue(os.path.isdir(os.path.join(DIRD, 'tmp')))
self.assertTrue(os.path.isdir(DIRD))
msgid_01 = dqh.enqueue("TEST MESSAGE")
self.assertTrue(msgid_01)
self.assertEqual(dqh.count_incoming(), 1)
self.assertEqual(dqh.count_pending(), 0)
self.assertEqual(dqh.count_errors(), 0)
self.assertEqual(dqh.count_done(), 0)
(xid, xdata) = dqh.next()
self.assertEqual(xid, msgid_01)
self.assertEqual(xdata, "TEST MESSAGE")
self.assertEqual(dqh.count_incoming(), 0)
self.assertEqual(dqh.count_pending(), 1)
self.assertEqual(dqh.count_errors(), 0)
self.assertEqual(dqh.count_done(), 0)
dqh.banish(msgid_01)
self.assertEqual(dqh.count_incoming(), 0)
self.assertEqual(dqh.count_pending(), 0)
self.assertEqual(dqh.count_errors(), 1)
self.assertEqual(dqh.count_done(), 0)
msgid_02 = dqh.enqueue("TEST MESSAGE")
msgid_03 = dqh.enqueue("TEST MESSAGE")
self.assertEqual(dqh.count_incoming(), 2)
self.assertEqual(dqh.count_pending(), 0)
self.assertEqual(dqh.count_errors(), 1)
self.assertEqual(dqh.count_done(), 0)
(xid, xdata) = dqh.next()
(xid, xdata) = dqh.next()
self.assertEqual(dqh.count_incoming(), 0)
self.assertEqual(dqh.count_pending(), 2)
self.assertEqual(dqh.count_errors(), 1)
self.assertEqual(dqh.count_done(), 0)
dqh.commit(msgid_02)
self.assertEqual(dqh.count_incoming(), 0)
self.assertEqual(dqh.count_pending(), 1)
self.assertEqual(dqh.count_errors(), 1)
self.assertEqual(dqh.count_done(), 1)
dqh.commit(msgid_03)
self.assertEqual(dqh.count_incoming(), 0)
self.assertEqual(dqh.count_pending(), 0)
self.assertEqual(dqh.count_errors(), 1)
self.assertEqual(dqh.count_done(), 2)
msgid_04 = dqh.enqueue("TEST MESSAGE A")
msgid_05 = dqh.enqueue("TEST MESSAGE B")
msgid_06 = dqh.enqueue("TEST MESSAGE C")
(xid, xdata) = dqh.next()
(xid, xdata) = dqh.next()
(xid, xdata) = dqh.next()
dqh.update(msgid_04, "TEST MESSAGE D")
dqh.update(msgid_05, "TEST MESSAGE E")
dqh.update(msgid_06, "TEST MESSAGE F")
self.assertEqual(dqh.reload(msgid_04), "TEST MESSAGE D")
self.assertEqual(dqh.reload(msgid_05), "TEST MESSAGE E")
self.assertEqual(dqh.reload(msgid_06), "TEST MESSAGE F")
dqh.banish(msgid_04, {"error": "Something went really really wrong"})
self.assertEqual(dqh.count_pending(), 2)
self.assertEqual(dqh.count_errors(), 3)
self.assertEqual(dqh._load_file(os.path.join(dqh.dir_errors, "{}.meta".format(msgid_04))), '{\n "error": "Something went really really wrong"\n}') # pylint: disable=protected-access
dqh.dispatch(msgid_05, '/tmp')
self.assertEqual(dqh.count_pending(), 1)
self.assertTrue(os.path.isfile(os.path.join('/tmp', msgid_05)))
os.unlink(os.path.join('/tmp', msgid_05))
dqh.duplicate(msgid_06, '/tmp')
self.assertEqual(dqh.count_pending(), 1)
self.assertTrue(os.path.isfile(os.path.join('/tmp', msgid_06)))
os.unlink(os.path.join('/tmp', msgid_06))
dqh.cancel(msgid_06)
self.assertEqual(dqh.count_pending(), 0)
self.assertEqual(dqh.count_errors(), 3)
[docs] def test_02_paralel(self):
"""
Perform paralel queue manager tests.
"""
dq1 = mentat.dirq.DirectoryQueue(DIRQ)
dq2 = mentat.dirq.DirectoryQueue(DIRQ)
#msgid_01 = dq1.enqueue("TEST MESSAGE 1")
#msgid_02 = dq2.enqueue("TEST MESSAGE 2")
#msgid_03 = dq1.enqueue("TEST MESSAGE 3")
#msgid_04 = dq2.enqueue("TEST MESSAGE 4")
#msgid_05 = dq1.enqueue("TEST MESSAGE 5")
#msgid_06 = dq2.enqueue("TEST MESSAGE 6")
dq1.enqueue("TEST MESSAGE 1")
dq2.enqueue("TEST MESSAGE 2")
dq1.enqueue("TEST MESSAGE 3")
dq2.enqueue("TEST MESSAGE 4")
dq1.enqueue("TEST MESSAGE 5")
dq2.enqueue("TEST MESSAGE 6")
self.assertEqual(dq1.count_incoming(), 6)
self.assertEqual(dq2.count_incoming(), 6)
#(xid1, xdata1) = dq1.next()
#(xid2, xdata2) = dq2.next()
#(xid3, xdata3) = dq1.next()
#(xid4, xdata4) = dq2.next()
#(xid5, xdata5) = dq1.next()
#(xid6, xdata6) = dq2.next()
dq1.next()
dq2.next()
dq1.next()
dq2.next()
dq1.next()
dq2.next()
self.assertEqual(dq1.count_incoming(), 0)
self.assertEqual(dq2.count_incoming(), 0)
self.assertEqual(dq1.count_pending(), 6)
self.assertEqual(dq2.count_pending(), 6)
st1 = dq1.stats
self.assertEqual(st1['cnt_dequeued'], 3)
self.assertEqual(st1['cnt_skips'], 2)
#-------------------------------------------------------------------------------
if __name__ == "__main__":
unittest.main()