Skip to content

Commit ea0ba99

Browse files
authored
Fix file_stream_provider example compilation failure on windows (#10975)
* Fix file_stream_provider on windows * Add .await & fmt
1 parent 268f648 commit ea0ba99

File tree

1 file changed

+171
-155
lines changed

1 file changed

+171
-155
lines changed

datafusion-examples/examples/file_stream_provider.rs

Lines changed: 171 additions & 155 deletions
Original file line numberDiff line numberDiff line change
@@ -15,172 +15,188 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use datafusion::assert_batches_eq;
19-
use datafusion_common::instant::Instant;
20-
use std::fs::{File, OpenOptions};
21-
use std::io::Write;
22-
use std::path::PathBuf;
23-
use std::sync::atomic::{AtomicBool, Ordering};
24-
use std::sync::Arc;
25-
use std::thread;
26-
use std::time::Duration;
27-
28-
use arrow::datatypes::{DataType, Field, Schema};
29-
use arrow_schema::SchemaRef;
30-
use futures::StreamExt;
31-
use nix::sys::stat;
32-
use nix::unistd;
33-
use tempfile::TempDir;
34-
use tokio::task::JoinSet;
35-
36-
use datafusion::datasource::stream::{FileStreamProvider, StreamConfig, StreamTable};
37-
use datafusion::datasource::TableProvider;
38-
use datafusion::prelude::{SessionConfig, SessionContext};
39-
use datafusion_common::{exec_err, Result};
40-
use datafusion_expr::Expr;
41-
42-
// Number of lines written to FIFO
43-
const TEST_BATCH_SIZE: usize = 5;
44-
const TEST_DATA_SIZE: usize = 5;
45-
46-
/// Makes a TableProvider for a fifo file using `StreamTable` with the `StreamProvider` trait
47-
fn fifo_table(
48-
schema: SchemaRef,
49-
path: impl Into<PathBuf>,
50-
sort: Vec<Vec<Expr>>,
51-
) -> Arc<dyn TableProvider> {
52-
let source = FileStreamProvider::new_file(schema, path.into())
53-
.with_batch_size(TEST_BATCH_SIZE)
54-
.with_header(true);
55-
let config = StreamConfig::new(Arc::new(source)).with_order(sort);
56-
Arc::new(StreamTable::new(Arc::new(config)))
57-
}
18+
#[cfg(not(target_os = "windows"))]
19+
mod non_windows {
20+
use datafusion::assert_batches_eq;
21+
use datafusion_common::instant::Instant;
22+
use std::fs::{File, OpenOptions};
23+
use std::io::Write;
24+
use std::path::PathBuf;
25+
use std::sync::atomic::{AtomicBool, Ordering};
26+
use std::sync::Arc;
27+
use std::thread;
28+
use std::time::Duration;
29+
30+
use arrow::datatypes::{DataType, Field, Schema};
31+
use arrow_schema::SchemaRef;
32+
use futures::StreamExt;
33+
use nix::sys::stat;
34+
use nix::unistd;
35+
use tempfile::TempDir;
36+
use tokio::task::JoinSet;
37+
38+
use datafusion::datasource::stream::{FileStreamProvider, StreamConfig, StreamTable};
39+
use datafusion::datasource::TableProvider;
40+
use datafusion::prelude::{SessionConfig, SessionContext};
41+
use datafusion_common::{exec_err, Result};
42+
use datafusion_expr::Expr;
43+
44+
// Number of lines written to FIFO
45+
const TEST_BATCH_SIZE: usize = 5;
46+
const TEST_DATA_SIZE: usize = 5;
47+
48+
/// Makes a TableProvider for a fifo file using `StreamTable` with the `StreamProvider` trait
49+
fn fifo_table(
50+
schema: SchemaRef,
51+
path: impl Into<PathBuf>,
52+
sort: Vec<Vec<Expr>>,
53+
) -> Arc<dyn TableProvider> {
54+
let source = FileStreamProvider::new_file(schema, path.into())
55+
.with_batch_size(TEST_BATCH_SIZE)
56+
.with_header(true);
57+
let config = StreamConfig::new(Arc::new(source)).with_order(sort);
58+
Arc::new(StreamTable::new(Arc::new(config)))
59+
}
5860

59-
fn create_fifo_file(tmp_dir: &TempDir, file_name: &str) -> Result<PathBuf> {
60-
let file_path = tmp_dir.path().join(file_name);
61-
// Simulate an infinite environment via a FIFO file
62-
if let Err(e) = unistd::mkfifo(&file_path, stat::Mode::S_IRWXU) {
63-
exec_err!("{}", e)
64-
} else {
65-
Ok(file_path)
61+
fn create_fifo_file(tmp_dir: &TempDir, file_name: &str) -> Result<PathBuf> {
62+
let file_path = tmp_dir.path().join(file_name);
63+
// Simulate an infinite environment via a FIFO file
64+
if let Err(e) = unistd::mkfifo(&file_path, stat::Mode::S_IRWXU) {
65+
exec_err!("{}", e)
66+
} else {
67+
Ok(file_path)
68+
}
6669
}
67-
}
6870

69-
fn write_to_fifo(
70-
mut file: &File,
71-
line: &str,
72-
ref_time: Instant,
73-
broken_pipe_timeout: Duration,
74-
) -> Result<()> {
75-
// We need to handle broken pipe error until the reader is ready. This
76-
// is why we use a timeout to limit the wait duration for the reader.
77-
// If the error is different than broken pipe, we fail immediately.
78-
while let Err(e) = file.write_all(line.as_bytes()) {
79-
if e.raw_os_error().unwrap() == 32 {
80-
let interval = Instant::now().duration_since(ref_time);
81-
if interval < broken_pipe_timeout {
82-
thread::sleep(Duration::from_millis(100));
83-
continue;
71+
fn write_to_fifo(
72+
mut file: &File,
73+
line: &str,
74+
ref_time: Instant,
75+
broken_pipe_timeout: Duration,
76+
) -> Result<()> {
77+
// We need to handle broken pipe error until the reader is ready. This
78+
// is why we use a timeout to limit the wait duration for the reader.
79+
// If the error is different than broken pipe, we fail immediately.
80+
while let Err(e) = file.write_all(line.as_bytes()) {
81+
if e.raw_os_error().unwrap() == 32 {
82+
let interval = Instant::now().duration_since(ref_time);
83+
if interval < broken_pipe_timeout {
84+
thread::sleep(Duration::from_millis(100));
85+
continue;
86+
}
8487
}
88+
return exec_err!("{}", e);
8589
}
86-
return exec_err!("{}", e);
90+
Ok(())
8791
}
88-
Ok(())
89-
}
9092

91-
fn create_writing_thread(
92-
file_path: PathBuf,
93-
maybe_header: Option<String>,
94-
lines: Vec<String>,
95-
waiting_lock: Arc<AtomicBool>,
96-
wait_until: usize,
97-
tasks: &mut JoinSet<()>,
98-
) {
99-
// Timeout for a long period of BrokenPipe error
100-
let broken_pipe_timeout = Duration::from_secs(10);
101-
let sa = file_path.clone();
102-
// Spawn a new thread to write to the FIFO file
103-
#[allow(clippy::disallowed_methods)] // spawn allowed only in tests
104-
tasks.spawn_blocking(move || {
105-
let file = OpenOptions::new().write(true).open(sa).unwrap();
106-
// Reference time to use when deciding to fail the test
107-
let execution_start = Instant::now();
108-
if let Some(header) = maybe_header {
109-
write_to_fifo(&file, &header, execution_start, broken_pipe_timeout).unwrap();
110-
}
111-
for (cnt, line) in lines.iter().enumerate() {
112-
while waiting_lock.load(Ordering::SeqCst) && cnt > wait_until {
113-
thread::sleep(Duration::from_millis(50));
93+
fn create_writing_thread(
94+
file_path: PathBuf,
95+
maybe_header: Option<String>,
96+
lines: Vec<String>,
97+
waiting_lock: Arc<AtomicBool>,
98+
wait_until: usize,
99+
tasks: &mut JoinSet<()>,
100+
) {
101+
// Timeout for a long period of BrokenPipe error
102+
let broken_pipe_timeout = Duration::from_secs(10);
103+
let sa = file_path.clone();
104+
// Spawn a new thread to write to the FIFO file
105+
#[allow(clippy::disallowed_methods)] // spawn allowed only in tests
106+
tasks.spawn_blocking(move || {
107+
let file = OpenOptions::new().write(true).open(sa).unwrap();
108+
// Reference time to use when deciding to fail the test
109+
let execution_start = Instant::now();
110+
if let Some(header) = maybe_header {
111+
write_to_fifo(&file, &header, execution_start, broken_pipe_timeout)
112+
.unwrap();
113+
}
114+
for (cnt, line) in lines.iter().enumerate() {
115+
while waiting_lock.load(Ordering::SeqCst) && cnt > wait_until {
116+
thread::sleep(Duration::from_millis(50));
117+
}
118+
write_to_fifo(&file, line, execution_start, broken_pipe_timeout).unwrap();
114119
}
115-
write_to_fifo(&file, line, execution_start, broken_pipe_timeout).unwrap();
120+
drop(file);
121+
});
122+
}
123+
124+
/// This example demonstrates a scanning against an Arrow data source (JSON) and
125+
/// fetching results
126+
pub async fn main() -> Result<()> {
127+
// Create session context
128+
let config = SessionConfig::new()
129+
.with_batch_size(TEST_BATCH_SIZE)
130+
.with_collect_statistics(false)
131+
.with_target_partitions(1);
132+
let ctx = SessionContext::new_with_config(config);
133+
let tmp_dir = TempDir::new()?;
134+
let fifo_path = create_fifo_file(&tmp_dir, "fifo_unbounded.csv")?;
135+
136+
let mut tasks: JoinSet<()> = JoinSet::new();
137+
let waiting = Arc::new(AtomicBool::new(true));
138+
139+
let data_iter = 0..TEST_DATA_SIZE;
140+
let lines = data_iter
141+
.map(|i| format!("{},{}\n", i, i + 1))
142+
.collect::<Vec<_>>();
143+
144+
create_writing_thread(
145+
fifo_path.clone(),
146+
Some("a1,a2\n".to_owned()),
147+
lines.clone(),
148+
waiting.clone(),
149+
TEST_DATA_SIZE,
150+
&mut tasks,
151+
);
152+
153+
// Create schema
154+
let schema = Arc::new(Schema::new(vec![
155+
Field::new("a1", DataType::UInt32, false),
156+
Field::new("a2", DataType::UInt32, false),
157+
]));
158+
159+
// Specify the ordering:
160+
let order = vec![vec![datafusion_expr::col("a1").sort(true, false)]];
161+
162+
let provider = fifo_table(schema.clone(), fifo_path, order.clone());
163+
ctx.register_table("fifo", provider)?;
164+
165+
let df = ctx.sql("SELECT * FROM fifo").await.unwrap();
166+
let mut stream = df.execute_stream().await.unwrap();
167+
168+
let mut batches = Vec::new();
169+
if let Some(Ok(batch)) = stream.next().await {
170+
batches.push(batch)
116171
}
117-
drop(file);
118-
});
172+
173+
let expected = vec![
174+
"+----+----+",
175+
"| a1 | a2 |",
176+
"+----+----+",
177+
"| 0 | 1 |",
178+
"| 1 | 2 |",
179+
"| 2 | 3 |",
180+
"| 3 | 4 |",
181+
"| 4 | 5 |",
182+
"+----+----+",
183+
];
184+
185+
assert_batches_eq!(&expected, &batches);
186+
187+
Ok(())
188+
}
119189
}
120190

121-
/// This example demonstrates a scanning against an Arrow data source (JSON) and
122-
/// fetching results
123191
#[tokio::main]
124-
async fn main() -> Result<()> {
125-
// Create session context
126-
let config = SessionConfig::new()
127-
.with_batch_size(TEST_BATCH_SIZE)
128-
.with_collect_statistics(false)
129-
.with_target_partitions(1);
130-
let ctx = SessionContext::new_with_config(config);
131-
let tmp_dir = TempDir::new()?;
132-
let fifo_path = create_fifo_file(&tmp_dir, "fifo_unbounded.csv")?;
133-
134-
let mut tasks: JoinSet<()> = JoinSet::new();
135-
let waiting = Arc::new(AtomicBool::new(true));
136-
137-
let data_iter = 0..TEST_DATA_SIZE;
138-
let lines = data_iter
139-
.map(|i| format!("{},{}\n", i, i + 1))
140-
.collect::<Vec<_>>();
141-
142-
create_writing_thread(
143-
fifo_path.clone(),
144-
Some("a1,a2\n".to_owned()),
145-
lines.clone(),
146-
waiting.clone(),
147-
TEST_DATA_SIZE,
148-
&mut tasks,
149-
);
150-
151-
// Create schema
152-
let schema = Arc::new(Schema::new(vec![
153-
Field::new("a1", DataType::UInt32, false),
154-
Field::new("a2", DataType::UInt32, false),
155-
]));
156-
157-
// Specify the ordering:
158-
let order = vec![vec![datafusion_expr::col("a1").sort(true, false)]];
159-
160-
let provider = fifo_table(schema.clone(), fifo_path, order.clone());
161-
ctx.register_table("fifo", provider)?;
162-
163-
let df = ctx.sql("SELECT * FROM fifo").await.unwrap();
164-
let mut stream = df.execute_stream().await.unwrap();
165-
166-
let mut batches = Vec::new();
167-
if let Some(Ok(batch)) = stream.next().await {
168-
batches.push(batch)
192+
async fn main() -> datafusion_common::Result<()> {
193+
#[cfg(target_os = "windows")]
194+
{
195+
println!("file_stream_provider example does not work on windows");
196+
Ok(())
197+
}
198+
#[cfg(not(target_os = "windows"))]
199+
{
200+
non_windows::main().await
169201
}
170-
171-
let expected = vec![
172-
"+----+----+",
173-
"| a1 | a2 |",
174-
"+----+----+",
175-
"| 0 | 1 |",
176-
"| 1 | 2 |",
177-
"| 2 | 3 |",
178-
"| 3 | 4 |",
179-
"| 4 | 5 |",
180-
"+----+----+",
181-
];
182-
183-
assert_batches_eq!(&expected, &batches);
184-
185-
Ok(())
186202
}

0 commit comments

Comments
 (0)