Skip to content

Commit

Permalink
Add basic testbed.
Browse files Browse the repository at this point in the history
Signed-off-by: hjpotter92 <[email protected]>
  • Loading branch information
hjpotter92 committed Feb 23, 2021
1 parent a5be8ac commit 18fd07f
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 4 deletions.
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import sonyflake.about as about


if __name__ == "__main__":
setup(
name=about.NAME,
Expand Down
6 changes: 3 additions & 3 deletions sonyflake/sonyflake.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import datetime
import ipaddress
from multiprocessing import Lock
from socket import gethostname, gethostbyname
from socket import gethostbyname, gethostname
from time import sleep
from typing import Callable, Dict, Optional
import datetime
import ipaddress

BIT_LEN_TIME = 39
BIT_LEN_SEQUENCE = 8
Expand Down
66 changes: 66 additions & 0 deletions tests/test_sonyflake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import multiprocessing
from collections import Counter
from datetime import datetime, timedelta
from random import randint
from time import sleep
from unittest import TestCase

from sonyflake.sonyflake import (BIT_LEN_SEQUENCE, SonyFlake,
lower_16bit_private_ip)


class SonyFlakeTestCase(TestCase):
def setUp(self):
start_time = datetime.utcnow()
self.sf = SonyFlake(start_time)
self.start_time = SonyFlake.to_sonyflake_time(start_time)
self.machine_id = lower_16bit_private_ip()

@staticmethod
def _current_time():
return SonyFlake.to_sonyflake_time(datetime.utcnow())

@staticmethod
def _sleep(duration):
return sleep(duration / 100)

def test_sonyflake_once(self):
sleep_time = randint(1, 50)
self._sleep(sleep_time)
next_id = self.sf.next_id()
parts = SonyFlake.decompose(next_id)
self.assertEqual(parts["msb"], 0)
self.assertTrue(sleep_time <= parts["time"] <= sleep_time + 1)
self.assertEqual(parts["sequence"], 0)
self.assertEqual(parts["machine_id"], self.machine_id)

def test_sonyflake_future(self):
future_start_time = datetime.utcnow() + timedelta(minutes=1)
sonyflake = SonyFlake(start_time=future_start_time)
self.assertIsNone(sonyflake, "SonyFlake starting in the future")

def test_sonyflake_invalid_machine_id(self):
def check_machine_id(_: int) -> bool:
return False

sonyflake = SonyFlake(check_machine_id=check_machine_id)
self.assertIsNone(sonyflake, "Machine ID check failed")

def test_sonyflake_for_10sec(self):
last_id = 0
current = initial = self._current_time()
max_sequence = 0
while (current - initial) < 100:
next_id = self.sf.next_id()
parts = SonyFlake.decompose(next_id)
self.assertLess(last_id, next_id, "Duplicated id")
current = self._current_time()
last_id = next_id
self.assertEqual(parts["msb"], 0)
self.assertEqual(parts["machine_id"], self.machine_id)
overtime = self.start_time + (parts["time"] - current)
self.assertLessEqual(overtime, 0, "Unexpected overtime.")
max_sequence = max(max_sequence, parts["sequence"])
self.assertEqual(
max_sequence, (1 << BIT_LEN_SEQUENCE) - 1, "Unexpected max sequence"
)

0 comments on commit 18fd07f

Please sign in to comment.