Skip to content

Commit

Permalink
KAFKA-13929: Replace legacy File.createNewFile() with NIO.2 Files.cre…
Browse files Browse the repository at this point in the history
…ateFile() (apache#12197)


Reviewers: Mickael Maison <[email protected]>
  • Loading branch information
divijvaidya authored Jun 10, 2022
1 parent 4a06458 commit 0a50005
Show file tree
Hide file tree
Showing 15 changed files with 50 additions and 38 deletions.
4 changes: 2 additions & 2 deletions core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -936,8 +936,8 @@ class LogLoaderTest {

// The files remain absent until we first access it because we are doing lazy loading for time index and offset index
// files but in this test case we need to create these files in order to test we will remove them.
bogusIndex2.createNewFile()
bogusTimeIndex2.createNewFile()
Files.createFile(bogusIndex2.toPath)
Files.createFile(bogusTimeIndex2.toPath)

def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes = 1)
Expand Down
5 changes: 3 additions & 2 deletions core/src/test/scala/unit/kafka/log/LogTestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.kafka.common.record.{CompressionType, ControlRecordType, EndTr
import org.apache.kafka.common.utils.{Time, Utils}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse}

import java.nio.file.Files
import scala.collection.Iterable
import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -142,8 +143,8 @@ object LogTestUtils {
segment.append(MemoryRecords.withRecords(baseOffset + Int.MaxValue - 1, CompressionType.NONE, 0,
record(baseOffset + Int.MaxValue - 1)))
// Need to create the offset files explicitly to avoid triggering segment recovery to truncate segment.
UnifiedLog.offsetIndexFile(logDir, baseOffset).createNewFile()
UnifiedLog.timeIndexFile(logDir, baseOffset).createNewFile()
Files.createFile(UnifiedLog.offsetIndexFile(logDir, baseOffset).toPath)
Files.createFile(UnifiedLog.timeIndexFile(logDir, baseOffset).toPath)
baseOffset + Int.MaxValue
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -975,9 +975,9 @@ class ProducerStateManagerTest {
// the broker shutdown cleanly and emitted a snapshot file larger than the base offset of the active segment.

// Create 3 snapshot files at different offsets.
UnifiedLog.producerSnapshotFile(logDir, 5).createNewFile() // not stray
UnifiedLog.producerSnapshotFile(logDir, 2).createNewFile() // stray
UnifiedLog.producerSnapshotFile(logDir, 42).createNewFile() // not stray
Files.createFile(UnifiedLog.producerSnapshotFile(logDir, 5).toPath) // not stray
Files.createFile(UnifiedLog.producerSnapshotFile(logDir, 2).toPath) // stray
Files.createFile(UnifiedLog.producerSnapshotFile(logDir, 42).toPath) // not stray

// claim that we only have one segment with a base offset of 5
stateManager.removeStraySnapshots(Seq(5))
Expand All @@ -995,9 +995,9 @@ class ProducerStateManagerTest {
// Snapshots associated with an offset in the list of segment base offsets should remain.

// Create 3 snapshot files at different offsets.
UnifiedLog.producerSnapshotFile(logDir, 5).createNewFile() // stray
UnifiedLog.producerSnapshotFile(logDir, 2).createNewFile() // stray
UnifiedLog.producerSnapshotFile(logDir, 42).createNewFile() // not stray
Files.createFile(UnifiedLog.producerSnapshotFile(logDir, 5).toPath) // stray
Files.createFile(UnifiedLog.producerSnapshotFile(logDir, 2).toPath) // stray
Files.createFile(UnifiedLog.producerSnapshotFile(logDir, 42).toPath) // not stray

stateManager.removeStraySnapshots(Seq(42))
assertEquals(Seq(42), ProducerStateManager.listSnapshotFiles(logDir).map(_.offset).sorted)
Expand All @@ -1009,7 +1009,7 @@ class ProducerStateManagerTest {
*/
@Test
def testRemoveAndMarkSnapshotForDeletion(): Unit = {
UnifiedLog.producerSnapshotFile(logDir, 5).createNewFile()
Files.createFile(UnifiedLog.producerSnapshotFile(logDir, 5).toPath)
val manager = new ProducerStateManager(partition, logDir, maxTransactionTimeoutMs, maxProducerIdExpirationMs, time)
assertTrue(manager.latestSnapshotOffset.isDefined)
val snapshot = manager.removeAndMarkSnapshotForDeletion(5).get
Expand All @@ -1027,7 +1027,7 @@ class ProducerStateManagerTest {
@Test
def testRemoveAndMarkSnapshotForDeletionAlreadyDeleted(): Unit = {
val file = UnifiedLog.producerSnapshotFile(logDir, 5)
file.createNewFile()
Files.createFile(file.toPath)
val manager = new ProducerStateManager(partition, logDir, maxTransactionTimeoutMs, maxProducerIdExpirationMs, time)
assertTrue(manager.latestSnapshotOffset.isDefined)
Files.delete(file.toPath)
Expand Down
10 changes: 5 additions & 5 deletions core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ class UnifiedLogTest {

def createEmptyLogs(dir: File, offsets: Int*): Unit = {
for(offset <- offsets) {
UnifiedLog.logFile(dir, offset).createNewFile()
UnifiedLog.offsetIndexFile(dir, offset).createNewFile()
Files.createFile(UnifiedLog.logFile(dir, offset).toPath)
Files.createFile(UnifiedLog.offsetIndexFile(dir, offset).toPath)
}
}

Expand Down Expand Up @@ -2413,8 +2413,8 @@ class UnifiedLogTest {
private def testDegenerateSplitSegmentWithOverflow(segmentBaseOffset: Long, records: List[MemoryRecords]): Unit = {
val segment = LogTestUtils.rawSegment(logDir, segmentBaseOffset)
// Need to create the offset files explicitly to avoid triggering segment recovery to truncate segment.
UnifiedLog.offsetIndexFile(logDir, segmentBaseOffset).createNewFile()
UnifiedLog.timeIndexFile(logDir, segmentBaseOffset).createNewFile()
Files.createFile(UnifiedLog.offsetIndexFile(logDir, segmentBaseOffset).toPath)
Files.createFile(UnifiedLog.timeIndexFile(logDir, segmentBaseOffset).toPath)
records.foreach(segment.append _)
segment.close()

Expand Down Expand Up @@ -3418,7 +3418,7 @@ class UnifiedLogTest {
// Delete the underlying directory to trigger a KafkaStorageException
val dir = log.dir
Utils.delete(dir)
dir.createNewFile()
Files.createFile(dir.toPath)

assertThrows(classOf[KafkaStorageException], () => {
log.delete()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.kafka.common.utils.Utils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}

import java.nio.file.Files
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -84,7 +85,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
val logDir = new File(kafkaConfig.logDirs.head)
// Make log directory of the partition on the leader broker inaccessible by replacing it with a file
CoreUtils.swallow(Utils.delete(logDir), this)
logDir.createNewFile()
Files.createFile(logDir.toPath)
assertTrue(logDir.isFile)

server = TestUtils.createServer(kafkaConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2597,7 +2597,7 @@ class ReplicaManagerTest {
// Delete the underlying directory to trigger an KafkaStorageException
val dir = log.dir.getParentFile
Utils.delete(dir)
dir.createNewFile()
Files.createFile(dir.toPath)
}

val partitionStates = Map(tp0 -> new StopReplicaPartitionState()
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/utils/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1464,7 +1464,7 @@ object TestUtils extends Logging {
val localLog = leaderBroker.replicaManager.localLogOrException(partition)
val logDir = localLog.dir.getParentFile
CoreUtils.swallow(Utils.delete(logDir), this)
logDir.createNewFile()
Files.createFile(logDir.toPath)
assertTrue(logDir.isFile)

if (failureType == Roll) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
Expand Down Expand Up @@ -70,8 +71,11 @@ public class RemoteLogMetadataSnapshotFile {

// Create an empty file if it does not exist.
try {
boolean newFileCreated = metadataStoreFile.createNewFile();
log.info("Remote log metadata snapshot file: [{}], newFileCreated: [{}]", metadataStoreFile, newFileCreated);
final boolean fileExists = Files.exists(metadataStoreFile.toPath());
if (!fileExists) {
Files.createFile(metadataStoreFile.toPath());
}
log.info("Remote log metadata snapshot file: [{}], newFileCreated: [{}]", metadataStoreFile, !fileExists);
} catch (IOException e) {
throw new KafkaException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -258,7 +259,8 @@ public void testNotCleanUpStateDirIfNotEmpty() throws InterruptedException {
assertTrue(appDir.exists()); // Application state directory Exists

try {
assertTrue((new File(appDir, "dummy")).createNewFile());
final File dummyFile = new File(appDir, "dummy");
Files.createFile(dummyFile.toPath());
} catch (final IOException e) {
throw new RuntimeException("Failed to create dummy file.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws IOExcept
// set readonly to the CHECKPOINT_FILE_NAME.tmp file because we will write data to the .tmp file first
// and then swap to CHECKPOINT_FILE_NAME by replacing it
final File file = new File(stateDirectory.globalStateDir(), StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp");
file.createNewFile();
Files.createFile(file.toPath());
file.setWritable(false);

try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(GlobalStateManagerImpl.class)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.io.FileWriter;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -813,7 +814,7 @@ public void shouldThrowIfLoadCheckpointThrows() throws Exception {

stateMgr.registerStore(persistentStore, persistentStore.stateRestoreCallback, null);
final File file = new File(stateMgr.baseDir(), CHECKPOINT_FILE_NAME);
file.createNewFile();
Files.createFile(file.toPath());
final FileWriter writer = new FileWriter(file);
writer.write("abcdefg");
writer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ public void shouldThrowProcessorStateExceptionIfStateDirOccupied() throws IOExce

// Replace application's stateDir to regular file
Utils.delete(appDir);
appDir.createNewFile();
Files.createFile(appDir.toPath());

assertThrows(ProcessorStateException.class, () -> directory.getOrCreateDirectoryForTask(taskId));
}
Expand All @@ -253,7 +253,7 @@ public void shouldThrowProcessorStateExceptionIfTestDirOccupied() throws IOExcep
// Replace taskDir to a regular file
final File taskDir = new File(appDir, toTaskDirString(taskId));
Utils.delete(taskDir);
taskDir.createNewFile();
Files.createFile(taskDir.toPath());

// Error: ProcessorStateException should be thrown.
assertThrows(ProcessorStateException.class, () -> directory.getOrCreateDirectoryForTask(taskId));
Expand Down Expand Up @@ -391,8 +391,8 @@ public void shouldReturnEmptyArrayIfListFilesReturnsNull() throws IOException {

// make sure the File#listFiles returns null and StateDirectory#listAllTaskDirectories is able to handle null
Utils.delete(appDir);
assertTrue(appDir.createNewFile());
assertTrue(appDir.exists());
Files.createFile(appDir.toPath());
assertTrue(Files.exists(appDir.toPath()));
assertNull(appDir.listFiles());
assertEquals(0, directory.listAllTaskDirectories().size());
}
Expand Down Expand Up @@ -571,7 +571,7 @@ public void shouldNotDeleteAppDirWhenCleanUpIfNotEmpty() throws IOException {

// Create a dummy file in appDir; for this, appDir will not be empty after cleanup.
final File dummyFile = new File(appDir, "dummy");
assertTrue(dummyFile.createNewFile());
Files.createFile(dummyFile.toPath());

try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StateDirectory.class)) {
// call StateDirectory#clean
Expand Down Expand Up @@ -791,7 +791,7 @@ public void shouldGetFreshProcessIdIfProcessFileDeleted() {
@Test
public void shouldGetFreshProcessIdIfJsonUnreadable() throws Exception {
final File processFile = new File(appDir, PROCESS_FILE_NAME);
assertThat(processFile.createNewFile(), is(true));
Files.createFile(processFile.toPath());
final UUID processId = UUID.randomUUID();

final FileOutputStream fileOutputStream = new FileOutputStream(processFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;

import java.nio.file.Files;
import java.util.ArrayList;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
Expand Down Expand Up @@ -3275,7 +3276,7 @@ private void makeTaskFolders(final String... names) throws Exception {

private void writeCheckpointFile(final TaskId task, final Map<TopicPartition, Long> offsets) throws Exception {
final File checkpointFile = getCheckpointFile(task);
assertThat(checkpointFile.createNewFile(), is(true));
Files.createFile(checkpointFile.toPath());
new OffsetCheckpoint(checkpointFile).write(offsets);
expect(stateDirectory.checkpointFileFor(task)).andReturn(checkpointFile);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.junit.Test;

import java.io.File;
import java.nio.file.Files;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
Expand Down Expand Up @@ -303,7 +304,7 @@ public void shouldUpdateSegmentFileNameFromOldDateFormatToNewFormat() throws Exc
for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
final File oldSegment = new File(storeDirectoryPath + File.separator + storeName + "-" + formatter.format(new Date(segmentId * segmentInterval)));
//noinspection ResultOfMethodCallIgnored
oldSegment.createNewFile();
Files.createFile(oldSegment.toPath());
}

segments.openExisting(context, -1L);
Expand All @@ -325,7 +326,7 @@ public void shouldUpdateSegmentFileNameFromOldColonFormatToNewFormat() throws Ex
for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
final File oldSegment = new File(storeDirectoryPath + File.separator + storeName + ":" + segmentId * (RETENTION_PERIOD / (NUM_SEGMENTS - 1)));
//noinspection ResultOfMethodCallIgnored
oldSegment.createNewFile();
Files.createFile(oldSegment.toPath());
}

segments.openExisting(context, -1L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.junit.Test;

import java.io.File;
import java.nio.file.Files;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
Expand Down Expand Up @@ -304,15 +305,15 @@ public void shouldUpdateSegmentFileNameFromOldDateFormatToNewFormat() throws Exc
for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
final File oldSegment = new File(storeDirectoryPath + File.separator + storeName + "-" + formatter.format(new Date(segmentId * segmentInterval)));
//noinspection ResultOfMethodCallIgnored
oldSegment.createNewFile();
Files.createFile(oldSegment.toPath());
}

segments.openExisting(context, -1L);

for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
final String segmentName = storeName + "." + (long) segmentId * segmentInterval;
final File newSegment = new File(storeDirectoryPath + File.separator + segmentName);
assertTrue(newSegment.exists());
assertTrue(Files.exists(newSegment.toPath()));
}
}

Expand All @@ -326,14 +327,14 @@ public void shouldUpdateSegmentFileNameFromOldColonFormatToNewFormat() throws Ex
for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
final File oldSegment = new File(storeDirectoryPath + File.separator + storeName + ":" + segmentId * (RETENTION_PERIOD / (NUM_SEGMENTS - 1)));
//noinspection ResultOfMethodCallIgnored
oldSegment.createNewFile();
Files.createFile(oldSegment.toPath());
}

segments.openExisting(context, -1L);

for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
final File newSegment = new File(storeDirectoryPath + File.separator + storeName + "." + segmentId * (RETENTION_PERIOD / (NUM_SEGMENTS - 1)));
assertTrue(newSegment.exists());
assertTrue(Files.exists(newSegment.toPath()));
}
}

Expand Down

0 comments on commit 0a50005

Please sign in to comment.